From f42ed03dc5be7290a913fe427cdf96f6d1bee792 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 15 Jun 2020 17:44:40 +0200 Subject: [PATCH] Optimized bstream reader used by XORChunk iterator (#7390) * Optimized bstream reader Signed-off-by: Marco Pracucci * Fixed linter Signed-off-by: Marco Pracucci * Added license to new file Signed-off-by: Marco Pracucci * Fixed type cast Signed-off-by: Marco Pracucci * Changed comments Signed-off-by: Marco Pracucci * Improved comments and rolledback no-op changes Signed-off-by: Marco Pracucci * Fixed race condition Signed-off-by: Marco Pracucci --- tsdb/chunkenc/bstream.go | 185 +++++++++++++++++++++------------- tsdb/chunkenc/bstream_test.go | 89 ++++++++++++++++ tsdb/chunkenc/xor.go | 42 ++++++-- 3 files changed, 237 insertions(+), 79 deletions(-) create mode 100644 tsdb/chunkenc/bstream_test.go diff --git a/tsdb/chunkenc/bstream.go b/tsdb/chunkenc/bstream.go index 0a02a73035..fdfc222119 100644 --- a/tsdb/chunkenc/bstream.go +++ b/tsdb/chunkenc/bstream.go @@ -49,10 +49,6 @@ type bstream struct { count uint8 // how many bits are valid in current byte } -func newBReader(b []byte) bstream { - return bstream{stream: b, count: 8} -} - func (b *bstream) bytes() []byte { return b.stream } @@ -111,90 +107,141 @@ func (b *bstream) writeBits(u uint64, nbits int) { } } -func (b *bstream) readBit() (bit, error) { - if len(b.stream) == 0 { +type bstreamReader struct { + stream []byte + streamOffset int // The offset from which read the next byte from the stream. + + buffer uint64 // The current buffer, filled from the stream, containing up to 8 bytes from which read bits. + valid uint8 // The number of bits valid to read (from left) in the current buffer. +} + +func newBReader(b []byte) bstreamReader { + return bstreamReader{ + stream: b, + } +} + +func (b *bstreamReader) readBit() (bit, error) { + if b.valid == 0 { + if !b.loadNextBuffer(1) { + return false, io.EOF + } + } + + return b.readBitFast() +} + +// readBitFast is like readBit but can return io.EOF if the internal buffer is empty. +// If it returns io.EOF, the caller should retry reading bits calling readBit(). +// This function must be kept small and a leaf in order to help the compiler inlining it +// and further improve performances. +func (b *bstreamReader) readBitFast() (bit, error) { + if b.valid == 0 { return false, io.EOF } - if b.count == 0 { - b.stream = b.stream[1:] + b.valid-- + bitmask := uint64(1) << b.valid + return (b.buffer & bitmask) != 0, nil +} - if len(b.stream) == 0 { - return false, io.EOF +func (b *bstreamReader) readBits(nbits uint8) (uint64, error) { + if b.valid == 0 { + if !b.loadNextBuffer(nbits) { + return 0, io.EOF } - b.count = 8 } - d := (b.stream[0] << (8 - b.count)) & 0x80 - b.count-- - return d != 0, nil -} + if nbits <= b.valid { + return b.readBitsFast(nbits) + } -func (b *bstream) ReadByte() (byte, error) { - return b.readByte() -} + // We have to read all remaining valid bits from the current buffer and a part from the next one. + bitmask := (uint64(1) << b.valid) - 1 + nbits -= b.valid + v := (b.buffer & bitmask) << nbits + b.valid = 0 -func (b *bstream) readByte() (byte, error) { - if len(b.stream) == 0 { + if !b.loadNextBuffer(nbits) { return 0, io.EOF } - if b.count == 0 { - b.stream = b.stream[1:] + bitmask = (uint64(1) << nbits) - 1 + v = v | ((b.buffer >> (b.valid - nbits)) & bitmask) + b.valid -= nbits - if len(b.stream) == 0 { - return 0, io.EOF - } - return b.stream[0], nil - } + return v, nil +} - if b.count == 8 { - b.count = 0 - return b.stream[0], nil - } - - byt := b.stream[0] << (8 - b.count) - b.stream = b.stream[1:] - - if len(b.stream) == 0 { +// readBitsFast is like readBits but can return io.EOF if the internal buffer is empty. +// If it returns io.EOF, the caller should retry reading bits calling readBits(). +// This function must be kept small and a leaf in order to help the compiler inlining it +// and further improve performances. +func (b *bstreamReader) readBitsFast(nbits uint8) (uint64, error) { + if nbits > b.valid { return 0, io.EOF } - // We just advanced the stream and can assume the shift to be 0. - byt |= b.stream[0] >> b.count + bitmask := (uint64(1) << nbits) - 1 + b.valid -= nbits - return byt, nil + return (b.buffer >> b.valid) & bitmask, nil } -func (b *bstream) readBits(nbits int) (uint64, error) { - var u uint64 - - for nbits >= 8 { - byt, err := b.readByte() - if err != nil { - return 0, err - } - - u = (u << 8) | uint64(byt) - nbits -= 8 +func (b *bstreamReader) ReadByte() (byte, error) { + v, err := b.readBits(8) + if err != nil { + return 0, err } - - if nbits == 0 { - return u, nil - } - - if nbits > int(b.count) { - u = (u << uint(b.count)) | uint64((b.stream[0]<<(8-b.count))>>(8-b.count)) - nbits -= int(b.count) - b.stream = b.stream[1:] - - if len(b.stream) == 0 { - return 0, io.EOF - } - b.count = 8 - } - - u = (u << uint(nbits)) | uint64((b.stream[0]<<(8-b.count))>>(8-uint(nbits))) - b.count -= uint8(nbits) - return u, nil + return byte(v), nil +} + +// loadNextBuffer loads the next bytes from the stream into the internal buffer. +// The input nbits is the minimum number of bits that must be read, but the implementation +// can read more (if possible) to improve performances. +func (b *bstreamReader) loadNextBuffer(nbits uint8) bool { + if b.streamOffset >= len(b.stream) { + return false + } + + // Handle the case there are more then 8 bytes in the buffer (most common case) + // in a optimized way. It's guaranteed that this branch will never read from the + // very last byte of the stream (which suffers race conditions due to concurrent + // writes). + if b.streamOffset+8 < len(b.stream) { + // This is ugly, but significantly faster. + b.buffer = + ((uint64(b.stream[b.streamOffset])) << 56) | + ((uint64(b.stream[b.streamOffset+1])) << 48) | + ((uint64(b.stream[b.streamOffset+2])) << 40) | + ((uint64(b.stream[b.streamOffset+3])) << 32) | + ((uint64(b.stream[b.streamOffset+4])) << 24) | + ((uint64(b.stream[b.streamOffset+5])) << 16) | + ((uint64(b.stream[b.streamOffset+6])) << 8) | + uint64(b.stream[b.streamOffset+7]) + + b.streamOffset += 8 + b.valid = 64 + return true + } + + // We're here if the are 8 or less bytes left in the stream. Since this reader needs + // to handle race conditions with concurrent writes happening on the very last byte + // we make sure to never over more than the minimum requested bits (rounded up to + // the next byte). The following code is slower but called less frequently. + nbytes := int((nbits / 8) + 1) + if b.streamOffset+nbytes > len(b.stream) { + nbytes = len(b.stream) - b.streamOffset + } + + buffer := uint64(0) + for i := 0; i < nbytes; i++ { + buffer = buffer | (uint64(b.stream[b.streamOffset+i]) << uint(8*(nbytes-i-1))) + } + + b.buffer = buffer + b.streamOffset += nbytes + b.valid = uint8(nbytes * 8) + + return true } diff --git a/tsdb/chunkenc/bstream_test.go b/tsdb/chunkenc/bstream_test.go new file mode 100644 index 0000000000..785a4d5dca --- /dev/null +++ b/tsdb/chunkenc/bstream_test.go @@ -0,0 +1,89 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The code in this file was largely written by Damian Gryski as part of +// https://github.com/dgryski/go-tsz and published under the license below. +// It received minor modifications to suit Prometheus's needs. + +// Copyright (c) 2015,2016 Damian Gryski +// All rights reserved. + +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: + +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package chunkenc + +import ( + "testing" + + "github.com/prometheus/prometheus/util/testutil" +) + +func TestBstreamReader(t *testing.T) { + // Write to the bit stream. + w := bstream{} + for _, bit := range []bit{true, false} { + w.writeBit(bit) + } + for nbits := 1; nbits <= 64; nbits++ { + w.writeBits(uint64(nbits), nbits) + } + for v := 1; v < 10000; v += 123 { + w.writeBits(uint64(v), 29) + } + + // Read back. + r := newBReader(w.bytes()) + for _, bit := range []bit{true, false} { + v, err := r.readBitFast() + if err != nil { + v, err = r.readBit() + } + testutil.Ok(t, err) + testutil.Equals(t, bit, v) + } + for nbits := uint8(1); nbits <= 64; nbits++ { + v, err := r.readBitsFast(nbits) + if err != nil { + v, err = r.readBits(nbits) + } + testutil.Ok(t, err) + testutil.Equals(t, uint64(nbits), v, "nbits=%d", nbits) + } + for v := 1; v < 10000; v += 123 { + actual, err := r.readBitsFast(29) + if err != nil { + actual, err = r.readBits(29) + } + testutil.Ok(t, err) + testutil.Equals(t, uint64(v), actual, "v=%d", v) + } +} diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 33c7f10404..e3b4f58b2a 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -240,7 +240,7 @@ func (a *xorAppender) writeVDelta(v float64) { } type xorIterator struct { - br bstream + br bstreamReader numTotal uint16 numRead uint16 @@ -328,7 +328,10 @@ func (it *xorIterator) Next() bool { // read delta-of-delta for i := 0; i < 4; i++ { d <<= 1 - bit, err := it.br.readBit() + bit, err := it.br.readBitFast() + if err != nil { + bit, err = it.br.readBit() + } if err != nil { it.err = err return false @@ -350,6 +353,7 @@ func (it *xorIterator) Next() bool { case 0x0e: sz = 20 case 0x0f: + // Do not use fast because it's very unlikely it will succeed. bits, err := it.br.readBits(64) if err != nil { it.err = err @@ -360,7 +364,10 @@ func (it *xorIterator) Next() bool { } if sz != 0 { - bits, err := it.br.readBits(int(sz)) + bits, err := it.br.readBitsFast(sz) + if err != nil { + bits, err = it.br.readBits(sz) + } if err != nil { it.err = err return false @@ -379,7 +386,10 @@ func (it *xorIterator) Next() bool { } func (it *xorIterator) readValue() bool { - bit, err := it.br.readBit() + bit, err := it.br.readBitFast() + if err != nil { + bit, err = it.br.readBit() + } if err != nil { it.err = err return false @@ -388,7 +398,10 @@ func (it *xorIterator) readValue() bool { if bit == zero { // it.val = it.val } else { - bit, err := it.br.readBit() + bit, err := it.br.readBitFast() + if err != nil { + bit, err = it.br.readBit() + } if err != nil { it.err = err return false @@ -397,14 +410,20 @@ func (it *xorIterator) readValue() bool { // reuse leading/trailing zero bits // it.leading, it.trailing = it.leading, it.trailing } else { - bits, err := it.br.readBits(5) + bits, err := it.br.readBitsFast(5) + if err != nil { + bits, err = it.br.readBits(5) + } if err != nil { it.err = err return false } it.leading = uint8(bits) - bits, err = it.br.readBits(6) + bits, err = it.br.readBitsFast(6) + if err != nil { + bits, err = it.br.readBits(6) + } if err != nil { it.err = err return false @@ -417,14 +436,17 @@ func (it *xorIterator) readValue() bool { it.trailing = 64 - it.leading - mbits } - mbits := int(64 - it.leading - it.trailing) - bits, err := it.br.readBits(mbits) + mbits := 64 - it.leading - it.trailing + bits, err := it.br.readBitsFast(mbits) + if err != nil { + bits, err = it.br.readBits(mbits) + } if err != nil { it.err = err return false } vbits := math.Float64bits(it.val) - vbits ^= (bits << it.trailing) + vbits ^= bits << it.trailing it.val = math.Float64frombits(vbits) }