mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
Merge pull request #18740 from roidelapluie/roidelapluie/snapshot-explicit-encoding-cases
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests for 32-bit x86 (push) Has been cancelled
CI / Go tests for Prometheus upgrades and downgrades (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Compliance testing (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
govulncheck / Run govulncheck (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests for 32-bit x86 (push) Has been cancelled
CI / Go tests for Prometheus upgrades and downgrades (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Compliance testing (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
govulncheck / Run govulncheck (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
tsdb: replace default encoding cases with explicit cases in snapshot encode/decode
This commit is contained in:
commit
891e698992
2 changed files with 93 additions and 2 deletions
|
|
@ -51,6 +51,7 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/encoding"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
|
|
@ -4928,6 +4929,91 @@ func TestSnapshotError(t *testing.T) {
|
|||
require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.seriesCreated))
|
||||
}
|
||||
|
||||
// TestSnapshotUnknownEncodingFallsBackToWAL verifies that a snapshot containing
|
||||
// an unknown chunk encoding causes the entire snapshot load to fail and fall back
|
||||
// to full WAL replay, recovering all series without data loss.
|
||||
func TestSnapshotUnknownEncodingFallsBackToWAL(t *testing.T) {
|
||||
head, _ := newTestHead(t, 120*4, compression.None, false)
|
||||
defer func() {
|
||||
head.opts.EnableMemorySnapshotOnShutdown = false
|
||||
require.NoError(t, head.Close())
|
||||
}()
|
||||
|
||||
floatHist := tsdbutil.GenerateTestGaugeFloatHistograms(1)[0]
|
||||
lblsFloatHist := labels.FromStrings("floathist", "bar")
|
||||
lblsFloat := labels.FromStrings("foo", "bar")
|
||||
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.AppendHistogram(0, lblsFloatHist, 99, nil, floatHist)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, lblsFloat, 99, 99.0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
head.opts.EnableMemorySnapshotOnShutdown = true
|
||||
require.NoError(t, head.Close())
|
||||
|
||||
// Find the snapshot and corrupt the encoding byte of the float histogram series.
|
||||
snapDir, _, _, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
sr, err := wlog.NewSegmentsReader(snapDir)
|
||||
require.NoError(t, err)
|
||||
r := wlog.NewReader(sr)
|
||||
syms := labels.NewSymbolTable()
|
||||
rdec := record.NewDecoder(syms, promslog.NewNopLogger())
|
||||
var (
|
||||
records [][]byte
|
||||
mutated bool
|
||||
)
|
||||
for r.Next() {
|
||||
rec := append([]byte(nil), r.Record()...)
|
||||
if rec[0] == chunkSnapshotRecordTypeSeries {
|
||||
buf := encoding.Decbuf{B: rec}
|
||||
_ = buf.Byte() // flag
|
||||
_ = buf.Be64() // ref
|
||||
lset := rdec.DecodeLabels(&buf)
|
||||
_ = buf.Be64int64() // chunkRange
|
||||
if buf.Uvarint() == 1 && lset.Get("floathist") == "bar" {
|
||||
_ = buf.Be64int64() // minTime
|
||||
_ = buf.Be64int64() // maxTime
|
||||
encPos := len(rec) - buf.Len()
|
||||
require.Equal(t, byte(chunkenc.EncFloatHistogram), rec[encPos],
|
||||
"expected float histogram encoding at computed offset")
|
||||
rec[encPos] = 0xFF
|
||||
mutated = true
|
||||
}
|
||||
}
|
||||
records = append(records, rec)
|
||||
}
|
||||
require.NoError(t, r.Err())
|
||||
require.NoError(t, sr.Close())
|
||||
require.True(t, mutated, "expected to find and corrupt the float histogram series record")
|
||||
|
||||
// Rewrite the snapshot with the mutated records.
|
||||
files, err := os.ReadDir(snapDir)
|
||||
require.NoError(t, err)
|
||||
for _, f := range files {
|
||||
require.NoError(t, os.Remove(filepath.Join(snapDir, f.Name())))
|
||||
}
|
||||
cp, err := wlog.New(nil, nil, snapDir, compression.None)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, cp.Log(records...))
|
||||
require.NoError(t, cp.Close())
|
||||
|
||||
// Reload the head; snapshot should fail due to unknown encoding and fall back to WAL.
|
||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(math.MinInt64))
|
||||
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))
|
||||
require.Equal(t, uint64(2), head.NumSeries(), "both series must be recovered from WAL")
|
||||
require.NotNil(t, head.series.getByHash(lblsFloat.Hash(), lblsFloat))
|
||||
require.NotNil(t, head.series.getByHash(lblsFloatHist.Hash(), lblsFloatHist))
|
||||
}
|
||||
|
||||
func TestHistogramMetrics(t *testing.T) {
|
||||
numHistograms := 10
|
||||
head, _ := newTestHead(t, 1000, compression.None, false)
|
||||
|
|
|
|||
|
|
@ -1249,8 +1249,10 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
|
|||
buf.PutBEFloat64(s.lastValue)
|
||||
case chunkenc.EncHistogram:
|
||||
record.EncodeHistogram(&buf, s.lastHistogramValue)
|
||||
default: // chunkenc.FloatHistogram.
|
||||
case chunkenc.EncFloatHistogram:
|
||||
record.EncodeFloatHistogram(&buf, s.lastFloatHistogramValue)
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown chunk encoding: %v", enc))
|
||||
}
|
||||
}
|
||||
s.Unlock()
|
||||
|
|
@ -1303,9 +1305,12 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh
|
|||
case chunkenc.EncHistogram:
|
||||
csr.lastHistogramValue = &histogram.Histogram{}
|
||||
record.DecodeHistogram(&dec, csr.lastHistogramValue)
|
||||
default: // chunkenc.FloatHistogram.
|
||||
case chunkenc.EncFloatHistogram:
|
||||
csr.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
||||
record.DecodeFloatHistogram(&dec, csr.lastFloatHistogramValue)
|
||||
default:
|
||||
// Guard against a new encoding added to chunkenc.FromData without a corresponding case here.
|
||||
return csr, fmt.Errorf("chunk encoding %v has no decode case", enc)
|
||||
}
|
||||
|
||||
err = dec.Err()
|
||||
|
|
|
|||
Loading…
Reference in a new issue