From c740789ce3b80b80b2d5758ce5464f7ba51836d2 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 24 Feb 2016 17:16:24 +0100 Subject: [PATCH 1/3] Improve predict_linear Fixes https://github.com/prometheus/prometheus/issues/1401 This remove the last (and in fact bogus) use of BoundaryValues. Thus, a whole lot of unused (and arguably sub-optimal / ugly) code can be removed here, too. --- promql/engine.go | 32 ----------- promql/functions.go | 98 +++++++++++++++------------------- promql/testdata/functions.test | 22 ++++++-- storage/local/interface.go | 3 -- storage/local/series.go | 75 -------------------------- storage/local/storage.go | 11 ---- storage/local/storage_test.go | 18 ++----- util/stats/query_stats.go | 3 -- 8 files changed, 66 insertions(+), 196 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 599486cae4..8367101812 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -575,15 +575,6 @@ func (ev *evaluator) evalMatrix(e Expr) matrix { return mat } -// evalMatrixBounds attempts to evaluate e to matrix boundaries and errors otherwise. -func (ev *evaluator) evalMatrixBounds(e Expr) matrix { - ms, ok := e.(*MatrixSelector) - if !ok { - ev.errorf("matrix bounds can only be evaluated for matrix selectors, got %T", e) - } - return ev.matrixSelectorBounds(ms) -} - // evalString attempts to evaluate e to a string value and errors otherwise. func (ev *evaluator) evalString(e Expr) *model.String { val := ev.eval(e) @@ -731,29 +722,6 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix { return matrix(sampleStreams) } -// matrixSelectorBounds evaluates the boundaries of a *MatrixSelector. -func (ev *evaluator) matrixSelectorBounds(node *MatrixSelector) matrix { - interval := metric.Interval{ - OldestInclusive: ev.Timestamp.Add(-node.Range - node.Offset), - NewestInclusive: ev.Timestamp.Add(-node.Offset), - } - - sampleStreams := make([]*sampleStream, 0, len(node.iterators)) - for fp, it := range node.iterators { - samplePairs := it.BoundaryValues(interval) - if len(samplePairs) == 0 { - continue - } - - ss := &sampleStream{ - Metric: node.metrics[fp], - Values: samplePairs, - } - sampleStreams = append(sampleStreams, ss) - } - return matrix(sampleStreams) -} - func (ev *evaluator) vectorAnd(lhs, rhs vector, matching *VectorMatching) vector { if matching.Card != CardManyToMany { panic("logical operations must always be many-to-many matching") diff --git a/promql/functions.go b/promql/functions.go index 4728b1e96e..0d3ec41951 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -524,10 +524,37 @@ func funcLog10(ev *evaluator, args Expressions) model.Value { return vector } +// linearRegression performs a least-square linear regression analysis on the +// provided SamplePairs. It returns the slope, and the intercept value at the +// provided time. +func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slope, intercept model.SampleValue) { + var ( + n model.SampleValue + sumX, sumY model.SampleValue + sumXY, sumX2 model.SampleValue + ) + for _, sample := range samples { + x := model.SampleValue( + model.Time(sample.Timestamp-interceptTime).UnixNano(), + ) / 1e9 + n += 1.0 + sumY += sample.Value + sumX += x + sumXY += x * sample.Value + sumX2 += x * x + } + covXY := sumXY - sumX*sumY/n + varX := sumX2 - sumX*sumX/n + + slope = covXY / varX + intercept = sumY/n - slope*sumX/n + return +} + // === deriv(node model.ValMatrix) Vector === func funcDeriv(ev *evaluator, args Expressions) model.Value { - resultVector := vector{} mat := ev.evalMatrix(args[0]) + resultVector := make(vector, 0, len(mat)) for _, samples := range mat { // No sense in trying to compute a derivative without at least two points. @@ -535,29 +562,10 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value { if len(samples.Values) < 2 { continue } - - // Least squares. - var ( - n model.SampleValue - sumX, sumY model.SampleValue - sumXY, sumX2 model.SampleValue - ) - for _, sample := range samples.Values { - x := model.SampleValue(sample.Timestamp.UnixNano() / 1e9) - n += 1.0 - sumY += sample.Value - sumX += x - sumXY += x * sample.Value - sumX2 += x * x - } - numerator := sumXY - sumX*sumY/n - denominator := sumX2 - (sumX*sumX)/n - - resultValue := numerator / denominator - + slope, _ := linearRegression(samples.Values, 0) resultSample := &sample{ Metric: samples.Metric, - Value: resultValue, + Value: slope, Timestamp: ev.Timestamp, } resultSample.Metric.Del(model.MetricNameLabel) @@ -568,44 +576,26 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value { // === predict_linear(node model.ValMatrix, k model.ValScalar) Vector === func funcPredictLinear(ev *evaluator, args Expressions) model.Value { - vec := funcDeriv(ev, args[0:1]).(vector) - duration := model.SampleValue(model.SampleValue(ev.evalFloat(args[1]))) + mat := ev.evalMatrix(args[0]) + resultVector := make(vector, 0, len(mat)) + duration := model.SampleValue(ev.evalFloat(args[1])) - excludedLabels := map[model.LabelName]struct{}{ - model.MetricNameLabel: {}, - } - - // Calculate predicted delta over the duration. - signatureToDelta := map[uint64]model.SampleValue{} - for _, el := range vec { - signature := model.SignatureWithoutLabels(el.Metric.Metric, excludedLabels) - signatureToDelta[signature] = el.Value * duration - } - - // add predicted delta to last value. - // TODO(beorn7): This is arguably suboptimal. The funcDeriv above has - // given us an estimate over the range. So we should add the delta to - // the value predicted for the end of the range. Also, once this has - // been rectified, we are not using BoundaryValues anywhere anymore, so - // we can kick out a whole lot of code. - matrixBounds := ev.evalMatrixBounds(args[0]) - outVec := make(vector, 0, len(signatureToDelta)) - for _, samples := range matrixBounds { + for _, samples := range mat { + // No sense in trying to predict anything without at least two points. + // Drop this vector element. if len(samples.Values) < 2 { continue } - signature := model.SignatureWithoutLabels(samples.Metric.Metric, excludedLabels) - delta, ok := signatureToDelta[signature] - if ok { - samples.Metric.Del(model.MetricNameLabel) - outVec = append(outVec, &sample{ - Metric: samples.Metric, - Value: delta + samples.Values[1].Value, - Timestamp: ev.Timestamp, - }) + slope, intercept := linearRegression(samples.Values, ev.Timestamp) + resultSample := &sample{ + Metric: samples.Metric, + Value: slope*duration + intercept, + Timestamp: ev.Timestamp, } + resultSample.Metric.Del(model.MetricNameLabel) + resultVector = append(resultVector, resultSample) } - return outVec + return resultVector } // === histogram_quantile(k model.ValScalar, vector model.ValVector) Vector === diff --git a/promql/testdata/functions.test b/promql/testdata/functions.test index bccec5a1a0..28233033e4 100644 --- a/promql/testdata/functions.test +++ b/promql/testdata/functions.test @@ -102,16 +102,28 @@ eval instant at 50m deriv(testcounter_reset_middle[100m]) {} 0.010606060606060607 # predict_linear should return correct result. +# X/s = [ 0, 300, 600, 900,1200,1500,1800,2100,2400,2700,3000] +# Y = [ 0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 50] +# sumX = 16500 +# sumY = 250 +# sumXY = 480000 +# sumX2 = 34650000 +# n = 11 +# covXY = 105000 +# varX = 9900000 +# slope = 0.010606060606060607 +# intercept at t=0: 6.818181818181818 +# intercept at t=3000: 38.63636363636364 +# intercept at t=3000+3600: 76.81818181818181 eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) - {} 88.181818181818185200 + {} 76.81818181818181 -# predict_linear is syntactic sugar around deriv. +# With http_requests, there is a sample value exactly at the end of +# the range, and it has exactly the predicted value, so predict_linear +# can be emulated with deriv. eval instant at 50m predict_linear(http_requests[50m], 3600) - (http_requests + deriv(http_requests[50m]) * 3600) {group="canary", instance="1", job="app-server"} 0 -eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) - (testcounter_reset_middle + deriv(testcounter_reset_middle[100m]) * 3600) - {} 0 - clear # Tests for label_replace. diff --git a/storage/local/interface.go b/storage/local/interface.go index 631525b4ad..5d910db09c 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -79,9 +79,6 @@ type SeriesIterator interface { // exist at precisely the given time, that value is returned. If no // applicable value exists, ZeroSamplePair is returned. ValueAtOrBeforeTime(model.Time) model.SamplePair - // Gets the boundary values of an interval: the first and last value - // within a given interval. - BoundaryValues(metric.Interval) []model.SamplePair // Gets all values contained within a given interval. RangeValues(metric.Interval) []model.SamplePair } diff --git a/storage/local/series.go b/storage/local/series.go index e6a37ad4a5..7a027a8a5d 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -540,71 +540,6 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return it.chunkIt.valueAtOrBeforeTime(t) } -// BoundaryValues implements SeriesIterator. -func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair { - // Find the first chunk for which the first sample is within the interval. - i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].firstTime().Before(in.OldestInclusive) - }) - // Only now check the last timestamp of the previous chunk (which is - // fairly expensive). - if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) { - i-- - } - - values := make([]model.SamplePair, 0, 2) - for j, c := range it.chunks[i:] { - if c.firstTime().After(in.NewestInclusive) { - if len(values) == 1 { - // We found the first value before but are now - // already past the last value. The value we - // want must be the last value of the previous - // chunk. So backtrack... - chunkIt := it.chunkIterator(i + j - 1) - values = append(values, model.SamplePair{ - Timestamp: chunkIt.lastTimestamp(), - Value: chunkIt.lastSampleValue(), - }) - } - break - } - chunkIt := it.chunkIterator(i + j) - if len(values) == 0 { - for s := range chunkIt.values() { - if len(values) == 0 && !s.Timestamp.Before(in.OldestInclusive) { - values = append(values, *s) - // We cannot just break out here as we have to consume all - // the values to not leak a goroutine. This could obviously - // be made much neater with more suitable methods in the chunk - // interface. But currently, BoundaryValues is only used by - // `predict_linear` so we would pollute the chunk interface - // unduly just for one single corner case. Plus, even that use - // of BoundaryValues is suboptimal and should be replaced. - } - } - } - if chunkIt.lastTimestamp().After(in.NewestInclusive) { - s := chunkIt.valueAtOrBeforeTime(in.NewestInclusive) - if s.Timestamp != model.Earliest { - values = append(values, s) - } - break - } - } - if len(values) == 1 { - // We found exactly one value. In that case, add the most recent we know. - chunkIt := it.chunkIterator(len(it.chunks) - 1) - values = append(values, model.SamplePair{ - Timestamp: chunkIt.lastTimestamp(), - Value: chunkIt.lastSampleValue(), - }) - } - if len(values) == 2 && values[0].Equal(&values[1]) { - return values[:1] - } - return values -} - // RangeValues implements SeriesIterator. func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { // Find the first chunk for which the first sample is within the interval. @@ -653,11 +588,6 @@ func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.Sa return it.samplePair } -// BoundaryValues implements SeriesIterator. -func (it *singleSampleSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair { - return it.RangeValues(in) -} - // RangeValues implements SeriesIterator. func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { if it.samplePair.Timestamp.After(in.NewestInclusive) || @@ -675,11 +605,6 @@ func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { return ZeroSamplePair } -// BoundaryValues implements SeriesIterator. -func (i nopSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair { - return []model.SamplePair{} -} - // RangeValues implements SeriesIterator. func (i nopSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { return []model.SamplePair{} diff --git a/storage/local/storage.go b/storage/local/storage.go index f90dfae8a8..f87f083ba3 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -374,17 +374,6 @@ func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair return bit.it.ValueAtOrBeforeTime(ts) } -// BoundaryValues implements the SeriesIterator interface. -func (bit *boundedIterator) BoundaryValues(interval metric.Interval) []model.SamplePair { - if interval.NewestInclusive < bit.start { - return []model.SamplePair{} - } - if interval.OldestInclusive < bit.start { - interval.OldestInclusive = bit.start - } - return bit.it.BoundaryValues(interval) -} - // RangeValues implements the SeriesIterator interface. func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.SamplePair { if interval.NewestInclusive < bit.start { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 8fcdcecfea..381f7c7a36 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -424,14 +424,6 @@ func TestRetentionCutoff(t *testing.T) { if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt { t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) } - - vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) - if len(vals) != 2 { - t.Errorf("expected 2 values but got %d", len(vals)) - } - if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt { - t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) - } } func TestDropMetrics(t *testing.T) { @@ -1036,18 +1028,18 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatalf("Error preloading everything: %s", err) } - actual := it.BoundaryValues(metric.Interval{ + actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, }) - if len(actual) != 2 { - t.Fatal("expected two results after purging half of series") + if len(actual) < 4000 { + t.Fatalf("expected more than %d results after purging half of series, got %d", 4000, len(actual)) } if actual[0].Timestamp < 6000 || actual[0].Timestamp > 10000 { t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp) } want := model.Time(19998) - if actual[1].Timestamp != want { + if actual[len(actual)-1].Timestamp != want { t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp) } @@ -1057,7 +1049,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatalf("Error preloading everything: %s", err) } - actual = it.BoundaryValues(metric.Interval{ + actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, }) diff --git a/util/stats/query_stats.go b/util/stats/query_stats.go index e739142b4a..6d92361a00 100644 --- a/util/stats/query_stats.go +++ b/util/stats/query_stats.go @@ -29,7 +29,6 @@ const ( ResultAppendTime QueryAnalysisTime GetValueAtTimeTime - GetBoundaryValuesTime GetRangeValuesTime ExecQueueTime ViewDiskPreparationTime @@ -60,8 +59,6 @@ func (s QueryTiming) String() string { return "Query analysis time" case GetValueAtTimeTime: return "GetValueAtTime() time" - case GetBoundaryValuesTime: - return "GetBoundaryValues() time" case GetRangeValuesTime: return "GetRangeValues() time" case ExecQueueTime: From 0ea5801e4766664016233451ddbff6f581822a1e Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 25 Feb 2016 12:23:42 +0100 Subject: [PATCH 2/3] Handle errors caused by data corruption more gracefully This requires all the panic calls upon unexpected data to be converted into errors returned. This pollute the function signatures quite lot. Well, this is Go... The ideas behind this are the following: - panic only if it's a programming error. Data corruptions happen, and they are not programming errors. - If we detect a data corruption, we "quarantine" the series, essentially removing it from the database and putting its data into a separate directory for forensics. - Failure during writing to a series file is not considered corruption automatically. It will call setDirty, though, so that a crashrecovery upon the next restart will commence and check for that. - Series quarantining and setDirty calls are logged and counted in metrics, but are hidden from the user of the interfaces in interface.go, whith the notable exception of Append(). The reasoning is that we treat corruption by removing the corrupted series, i.e. a query for it will return no results on its next call anyway, so return no results right now. In the case of Append(), we want to tell the user that no data has been appended, though. Minor side effects: - Now consistently using filepath.* instead of path.*. - Introduced structured logging where I touched it. This makes things less consistent, but a complete change to structured logging would be out of scope for this PR. --- promql/analyzer.go | 12 +- storage/local/chunk.go | 72 ++++++---- storage/local/crashrecovery.go | 78 +++++++---- storage/local/delta.go | 174 ++++++++++++++++------- storage/local/doubledelta.go | 194 +++++++++++++++++--------- storage/local/index/index.go | 9 +- storage/local/instrumentation.go | 3 + storage/local/interface.go | 6 +- storage/local/persistence.go | 140 ++++++++++++++----- storage/local/persistence_test.go | 174 ++++++++++++++++++++--- storage/local/preload.go | 18 +-- storage/local/series.go | 94 ++++++++++--- storage/local/storage.go | 225 ++++++++++++++++++++++++------ storage/local/storage_test.go | 171 +++++++++++++++-------- 14 files changed, 1002 insertions(+), 368 deletions(-) diff --git a/promql/analyzer.go b/promql/analyzer.go index f56c4fc82a..bad5fbd92b 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -146,21 +146,13 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err = contextDone(ctx, env); err != nil { return nil, err } - iter, err := p.PreloadRange(fp, start.Add(-rangeDuration), end) - if err != nil { - return nil, err - } - itersForDuration[fp] = iter + itersForDuration[fp] = p.PreloadRange(fp, start.Add(-rangeDuration), end) } for fp := range pt.instants { if err = contextDone(ctx, env); err != nil { return nil, err } - iter, err := p.PreloadInstant(fp, start, StalenessDelta) - if err != nil { - return nil, err - } - itersForDuration[fp] = iter + itersForDuration[fp] = p.PreloadInstant(fp, start, StalenessDelta) } } diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 9411cac7c8..d8b1ae2b8d 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -112,7 +112,7 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { // add adds a sample pair to the underlying chunk. For safe concurrent access, // The chunk must be pinned, and the caller must have locked the fingerprint of // the series. -func (cd *chunkDesc) add(s *model.SamplePair) []chunk { +func (cd *chunkDesc) add(s model.SamplePair) ([]chunk, error) { return cd.c.add(s) } @@ -169,9 +169,9 @@ func (cd *chunkDesc) firstTime() model.Time { // lastTime returns the timestamp of the last sample in the chunk. For safe // concurrent access, this method requires the fingerprint of the time series to // be locked. -func (cd *chunkDesc) lastTime() model.Time { +func (cd *chunkDesc) lastTime() (model.Time, error) { if cd.chunkLastTime != model.Earliest || cd.c == nil { - return cd.chunkLastTime + return cd.chunkLastTime, nil } return cd.c.newIterator().lastTimestamp() } @@ -181,10 +181,15 @@ func (cd *chunkDesc) lastTime() model.Time { // last sample to a chunk or after closing a head chunk due to age. For safe // concurrent access, the chunk must be pinned, and the caller must have locked // the fingerprint of the series. -func (cd *chunkDesc) maybePopulateLastTime() { +func (cd *chunkDesc) maybePopulateLastTime() error { if cd.chunkLastTime == model.Earliest && cd.c != nil { - cd.chunkLastTime = cd.c.newIterator().lastTimestamp() + t, err := cd.c.newIterator().lastTimestamp() + if err != nil { + return err + } + cd.chunkLastTime = t } + return nil } // isEvicted returns whether the chunk is evicted. For safe concurrent access, @@ -241,14 +246,14 @@ type chunk interface { // any. The first chunk returned might be the same as the original one // or a newly allocated version. In any case, take the returned chunk as // the relevant one and discard the original chunk. - add(sample *model.SamplePair) []chunk + add(sample model.SamplePair) ([]chunk, error) clone() chunk firstTime() model.Time newIterator() chunkIterator marshal(io.Writer) error marshalToBuf([]byte) error unmarshal(io.Reader) error - unmarshalFromBuf([]byte) + unmarshalFromBuf([]byte) error encoding() chunkEncoding } @@ -259,56 +264,73 @@ type chunkIterator interface { // length returns the number of samples in the chunk. length() int // Gets the timestamp of the n-th sample in the chunk. - timestampAtIndex(int) model.Time + timestampAtIndex(int) (model.Time, error) // Gets the last timestamp in the chunk. - lastTimestamp() model.Time + lastTimestamp() (model.Time, error) // Gets the sample value of the n-th sample in the chunk. - sampleValueAtIndex(int) model.SampleValue + sampleValueAtIndex(int) (model.SampleValue, error) // Gets the last sample value in the chunk. - lastSampleValue() model.SampleValue + lastSampleValue() (model.SampleValue, error) // Gets the value that is closest before the given time. In case a value // exists at precisely the given time, that value is returned. If no // applicable value exists, ZeroSamplePair is returned. - valueAtOrBeforeTime(model.Time) model.SamplePair + valueAtOrBeforeTime(model.Time) (model.SamplePair, error) // Gets all values contained within a given interval. - rangeValues(metric.Interval) []model.SamplePair + rangeValues(metric.Interval) ([]model.SamplePair, error) // Whether a given timestamp is contained between first and last value // in the chunk. - contains(model.Time) bool + contains(model.Time) (bool, error) // values returns a channel, from which all sample values in the chunk // can be received in order. The channel is closed after the last // one. It is generally not safe to mutate the chunk while the channel - // is still open. - values() <-chan *model.SamplePair + // is still open. If a value is returned with error!=nil, no further + // values will be returned and the channel is closed. + values() <-chan struct { + model.SamplePair + error + } } -func transcodeAndAdd(dst chunk, src chunk, s *model.SamplePair) []chunk { +func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { chunkOps.WithLabelValues(transcode).Inc() head := dst body := []chunk{} for v := range src.newIterator().values() { - newChunks := head.add(v) + if v.error != nil { + return nil, v.error + } + newChunks, err := head.add(v.SamplePair) + if err != nil { + return nil, err + } body = append(body, newChunks[:len(newChunks)-1]...) head = newChunks[len(newChunks)-1] } - newChunks := head.add(s) - return append(body, newChunks...) + newChunks, err := head.add(s) + if err != nil { + return nil, err + } + return append(body, newChunks...), nil } // newChunk creates a new chunk according to the encoding set by the // defaultChunkEncoding flag. func newChunk() chunk { - return newChunkForEncoding(DefaultChunkEncoding) + chunk, err := newChunkForEncoding(DefaultChunkEncoding) + if err != nil { + panic(err) + } + return chunk } -func newChunkForEncoding(encoding chunkEncoding) chunk { +func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { switch encoding { case delta: - return newDeltaEncodedChunk(d1, d0, true, chunkLen) + return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil case doubleDelta: - return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen) + return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil default: - panic(fmt.Errorf("unknown chunk encoding: %v", encoding)) + return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } } diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index d4c9a4fa01..f51e54e7be 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -14,10 +14,11 @@ package local import ( + "errors" "fmt" "io" "os" - "path" + "path/filepath" "strings" "sync/atomic" @@ -52,7 +53,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint log.Info("Scanning files.") for i := 0; i < 1<<(seriesDirNameLen*4); i++ { - dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i)) + dirname := filepath.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i)) dir, err := os.Open(dirname) if os.IsNotExist(err) { continue @@ -139,7 +140,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint } } - p.setDirty(false) + p.setDirty(false, nil) log.Warn("Crash recovery complete.") return nil } @@ -175,36 +176,46 @@ func (p *persistence) sanitizeSeries( fingerprintToSeries map[model.Fingerprint]*memorySeries, fpm fpMappings, ) (model.Fingerprint, bool) { - filename := path.Join(dirname, fi.Name()) + var ( + fp model.Fingerprint + err error + filename = filepath.Join(dirname, fi.Name()) + s *memorySeries + ) + purge := func() { - var err error - defer func() { - if err != nil { - log.Errorf("Failed to move lost series file %s to orphaned directory, deleting it instead. Error was: %s", filename, err) - if err = os.Remove(filename); err != nil { - log.Errorf("Even deleting file %s did not work: %s", filename, err) - } + if fp != 0 { + var metric model.Metric + if s != nil { + metric = s.metric } - }() - orphanedDir := path.Join(p.basePath, "orphaned", path.Base(dirname)) - if err = os.MkdirAll(orphanedDir, 0700); err != nil { - return + if err = p.quarantineSeriesFile( + fp, errors.New("purge during crash recovery"), metric, + ); err == nil { + return + } + log. + With("file", filename). + With("error", err). + Error("Failed to move lost series file to orphaned directory.") } - if err = os.Rename(filename, path.Join(orphanedDir, fi.Name())); err != nil { - return + // If we are here, we are either purging an incorrectly named + // file, or quarantining has failed. So simply delete the file. + if err = os.Remove(filename); err != nil { + log. + With("file", filename). + With("error", err). + Error("Failed to delete lost series file.") } } - var fp model.Fingerprint - var err error - if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) || !strings.HasSuffix(fi.Name(), seriesFileSuffix) { log.Warnf("Unexpected series file name %s.", filename) purge() return fp, false } - if fp, err = model.FingerprintFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil { + if fp, err = model.FingerprintFromString(filepath.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil { log.Warnf("Error parsing file name %s: %s", filename, err) purge() return fp, false @@ -274,7 +285,15 @@ func (p *persistence) sanitizeSeries( s.chunkDescs = cds s.chunkDescsOffset = 0 s.savedFirstTime = cds[0].firstTime() - s.lastTime = cds[len(cds)-1].lastTime() + s.lastTime, err = cds[len(cds)-1].lastTime() + if err != nil { + log.Errorf( + "Failed to determine time of the last sample for metric %v, fingerprint %v: %s", + s.metric, fp, err, + ) + purge() + return fp, false + } s.persistWatermark = len(cds) s.modTime = modTime return fp, true @@ -304,7 +323,15 @@ func (p *persistence) sanitizeSeries( s.savedFirstTime = cds[0].firstTime() s.modTime = modTime - lastTime := cds[len(cds)-1].lastTime() + lastTime, err := cds[len(cds)-1].lastTime() + if err != nil { + log.Errorf( + "Failed to determine time of the last sample for metric %v, fingerprint %v: %s", + s.metric, fp, err, + ) + purge() + return fp, false + } keepIdx := -1 for i, cd := range s.chunkDescs { if cd.firstTime() >= lastTime { @@ -414,7 +441,10 @@ func (p *persistence) cleanUpArchiveIndexes( if err != nil { return err } - series := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp))) + series, err := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp))) + if err != nil { + return err + } fpToSeries[model.Fingerprint(fp)] = series return nil }); err != nil { diff --git a/storage/local/delta.go b/storage/local/delta.go index 99da249c41..c787020722 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -76,7 +76,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod } // add implements chunk. -func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { +func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { if c.len() == 0 { c = c[:deltaHeaderBytes] binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) @@ -89,14 +89,17 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } baseValue := c.baseValue() dt := s.Timestamp - c.baseTime() if dt < 0 { - panic("time delta is less than zero") + return nil, fmt.Errorf("time delta is less than zero: %v", dt) } dv := s.Value - baseValue @@ -130,8 +133,11 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) } // Chunk is already half full. Better create a new one and save the transcoding efforts. - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } offset := len(c) @@ -148,7 +154,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) default: - panic("invalid number of bytes for time delta") + return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) } offset += int(tb) @@ -165,7 +171,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv))) // d8 must not happen. Those samples are encoded as float64. default: - panic("invalid number of bytes for integer delta") + return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) } } else { switch vb { @@ -175,10 +181,10 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) default: - panic("invalid number of bytes for floating point delta") + return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []chunk{&c} + return []chunk{&c}, nil } // clone implements chunk. @@ -243,15 +249,24 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { if _, err := io.ReadFull(r, *c); err != nil { return err } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] return nil } // unmarshalFromBuf implements chunk. -func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) { +func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] copy(*c, buf) - *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] + return nil } // encoding implements chunk. @@ -302,57 +317,108 @@ type deltaEncodedChunkIterator struct { func (it *deltaEncodedChunkIterator) length() int { return it.len } // valueAtOrBeforeTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { +func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + var lastErr error i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) + ts, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return ts.After(t) }) - if i == 0 { - return ZeroSamplePair + if i == 0 || lastErr != nil { + return ZeroSamplePair, lastErr } - return model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), + ts, err := it.timestampAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err } + v, err := it.sampleValueAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err + } + return model.SamplePair{Timestamp: ts, Value: v}, nil } // rangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair { +func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + var lastErr error + oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return !t.Before(in.OldestInclusive) }) newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return t.After(in.NewestInclusive) }) - if oldest == it.len { - return nil + if oldest == it.len || lastErr != nil { + return nil, lastErr } result := make([]model.SamplePair, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - }) + t, err := it.timestampAtIndex(i) + if err != nil { + return nil, err + } + v, err := it.sampleValueAtIndex(i) + if err != nil { + return nil, err + } + result = append(result, model.SamplePair{Timestamp: t, Value: v}) } - return result + return result, nil } // contains implements chunkIterator. -func (it *deltaEncodedChunkIterator) contains(t model.Time) bool { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) +func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { + lastT, err := it.timestampAtIndex(it.len - 1) + if err != nil { + return false, err + } + return !t.Before(it.baseT) && !t.After(lastT), nil } // values implements chunkIterator. -func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair { - valuesChan := make(chan *model.SamplePair) +func (it *deltaEncodedChunkIterator) values() <-chan struct { + model.SamplePair + error +} { + valuesChan := make(chan struct { + model.SamplePair + error + }) go func() { for i := 0; i < it.len; i++ { - valuesChan <- &model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), + t, err := it.timestampAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break } + v, err := it.sampleValueAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break + } + valuesChan <- struct { + model.SamplePair + error + }{model.SamplePair{Timestamp: t, Value: v}, nil} } close(valuesChan) }() @@ -360,61 +426,61 @@ func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair { } // timestampAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { +func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) switch it.tBytes { case d1: - return it.baseT + model.Time(uint8(it.c[offset])) + return it.baseT + model.Time(uint8(it.c[offset])), nil case d2: - return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])) + return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil case d4: - return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])) + return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil default: - panic("invalid number of bytes for time delta") + return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) } } // lastTimestamp implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastTimestamp() model.Time { +func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { return it.timestampAtIndex(it.len - 1) } // sampleValueAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { +func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) if it.isInt { switch it.vBytes { case d0: - return it.baseV + return it.baseV, nil case d1: - return it.baseV + model.SampleValue(int8(it.c[offset])) + return it.baseV + model.SampleValue(int8(it.c[offset])), nil case d2: - return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil case d4: - return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil // No d8 for ints. default: - panic("invalid number of bytes for integer delta") + return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: - return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil default: - panic("invalid number of bytes for floating point delta") + return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } } // lastSampleValue implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastSampleValue() model.SampleValue { +func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { return it.sampleValueAtIndex(it.len - 1) } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index cb50db6a7c..257c845443 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -83,9 +83,9 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub } // add implements chunk. -func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { +func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { if c.len() == 0 { - return c.addFirstSample(s) + return c.addFirstSample(s), nil } tb := c.timeBytes() @@ -101,8 +101,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta() @@ -136,8 +139,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) } // Chunk is already half full. Better create a new one and save the transcoding efforts. - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } offset := len(c) @@ -154,7 +160,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) default: - panic("invalid number of bytes for time delta") + return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) } offset += int(tb) @@ -171,7 +177,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv))) // d8 must not happen. Those samples are encoded as float64. default: - panic("invalid number of bytes for integer delta") + return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) } } else { switch vb { @@ -181,10 +187,10 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) default: - panic("invalid number of bytes for floating point delta") + return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []chunk{&c} + return []chunk{&c}, nil } // clone implements chunk. @@ -251,15 +257,24 @@ func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { if _, err := io.ReadFull(r, *c); err != nil { return err } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] return nil } // unmarshalFromBuf implements chunk. -func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) { +func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] copy(*c, buf) - *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] + return nil } // encoding implements chunk. @@ -335,7 +350,7 @@ func (c doubleDeltaEncodedChunk) isInt() bool { // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. -func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk { +func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk { c = c[:doubleDeltaHeaderBaseValueOffset+8] binary.LittleEndian.PutUint64( c[doubleDeltaHeaderBaseTimeOffset:], @@ -350,10 +365,10 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk { // addSecondSample is a helper method only used by c.add(). It calculates the // base delta from the provided sample and adds it to the chunk. -func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb deltaBytes) []chunk { +func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]chunk, error) { baseTimeDelta := s.Timestamp - c.baseTime() if baseTimeDelta < 0 { - panic("base time delta is less than zero") + return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) } c = c[:doubleDeltaHeaderBytes] if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { @@ -391,7 +406,7 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb del math.Float64bits(float64(baseValueDelta)), ) } - return []chunk{&c} + return []chunk{&c}, nil } // doubleDeltaEncodedChunkIterator implements chunkIterator. @@ -408,57 +423,108 @@ type doubleDeltaEncodedChunkIterator struct { func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } // valueAtOrBeforeTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { +func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + var lastErr error i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) + ts, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return ts.After(t) }) - if i == 0 { - return ZeroSamplePair + if i == 0 || lastErr != nil { + return ZeroSamplePair, lastErr } - return model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), + ts, err := it.timestampAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err } + v, err := it.sampleValueAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err + } + return model.SamplePair{Timestamp: ts, Value: v}, nil } // rangeValues implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair { +func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + var lastErr error + oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return !t.Before(in.OldestInclusive) }) newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return t.After(in.NewestInclusive) }) - if oldest == it.len { - return nil + if oldest == it.len || lastErr != nil { + return nil, lastErr } result := make([]model.SamplePair, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - }) + t, err := it.timestampAtIndex(i) + if err != nil { + return nil, err + } + v, err := it.sampleValueAtIndex(i) + if err != nil { + return nil, err + } + result = append(result, model.SamplePair{Timestamp: t, Value: v}) } - return result + return result, nil } // contains implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) bool { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) +func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) { + lastT, err := it.timestampAtIndex(it.len - 1) + if err != nil { + return false, err + } + return !t.Before(it.baseT) && !t.After(lastT), nil } // values implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair { - valuesChan := make(chan *model.SamplePair) +func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct { + model.SamplePair + error +} { + valuesChan := make(chan struct { + model.SamplePair + error + }) go func() { for i := 0; i < it.len; i++ { - valuesChan <- &model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), + t, err := it.timestampAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break } + v, err := it.sampleValueAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break + } + valuesChan <- struct { + model.SamplePair + error + }{model.SamplePair{Timestamp: t, Value: v}, nil} } close(valuesChan) }() @@ -466,17 +532,17 @@ func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair { } // timestampAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { +func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { if idx == 0 { - return it.baseT + return it.baseT, nil } if idx == 1 { // If time bytes are at d8, the time is saved directly rather // than as a difference. if it.tBytes == d8 { - return it.baseΔT + return it.baseΔT, nil } - return it.baseT + it.baseΔT + return it.baseT + it.baseΔT, nil } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) @@ -485,40 +551,40 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time case d1: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int8(it.c[offset])) + model.Time(int8(it.c[offset])), nil case d2: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil case d4: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil default: - panic("invalid number of bytes for time delta") + return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) } } // lastTimestamp implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() model.Time { +func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { return it.timestampAtIndex(it.len - 1) } // sampleValueAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { +func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { if idx == 0 { - return it.baseV + return it.baseV, nil } if idx == 1 { // If value bytes are at d8, the value is saved directly rather // than as a difference. if it.vBytes == d8 { - return it.baseΔV + return it.baseΔV, nil } - return it.baseV + it.baseΔV + return it.baseV + it.baseΔV, nil } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) @@ -527,39 +593,39 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.Sam switch it.vBytes { case d0: return it.baseV + - model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*it.baseΔV, nil case d1: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int8(it.c[offset])) + model.SampleValue(int8(it.c[offset])), nil case d2: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil // No d8 for ints. default: - panic("invalid number of bytes for integer delta") + return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil default: - panic("invalid number of bytes for floating point delta") + return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } } // lastSampleValue implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() model.SampleValue { +func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { return it.sampleValueAtIndex(it.len - 1) } diff --git a/storage/local/index/index.go b/storage/local/index/index.go index 14200092aa..e189004cc7 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -19,6 +19,7 @@ package index import ( "os" "path" + "path/filepath" "github.com/prometheus/common/model" @@ -95,7 +96,7 @@ func (i *FingerprintMetricIndex) Lookup(fp model.Fingerprint) (metric model.Metr // ready to use. func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) { fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, fingerprintToMetricDir), + Path: filepath.Join(basePath, fingerprintToMetricDir), CacheSizeBytes: FingerprintMetricCacheSize, }) if err != nil { @@ -167,7 +168,7 @@ func (i *LabelNameLabelValuesIndex) LookupSet(l model.LabelName) (values map[mod // LabelNameLabelValuesIndex ready to use. func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) { labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, labelNameToLabelValuesDir), + Path: filepath.Join(basePath, labelNameToLabelValuesDir), CacheSizeBytes: LabelNameLabelValuesCacheSize, }) if err != nil { @@ -245,7 +246,7 @@ func (i *LabelPairFingerprintIndex) LookupSet(p model.LabelPair) (fps map[model. // LabelPairFingerprintIndex ready to use. func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) { labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, labelPairToFingerprintsDir), + Path: filepath.Join(basePath, labelPairToFingerprintsDir), CacheSizeBytes: LabelPairFingerprintsCacheSize, }) if err != nil { @@ -283,7 +284,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp model.Fingerprint) (firstTime, las // FingerprintTimeRangeIndex ready to use. func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) { fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, fingerprintTimeRangeDir), + Path: filepath.Join(basePath, fingerprintTimeRangeDir), CacheSizeBytes: FingerprintTimeRangeCacheSize, }) if err != nil { diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 85a7aa5e0f..6d43ebd395 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -60,6 +60,9 @@ const ( requestedPurge = "purge_on_request" memoryMaintenance = "maintenance_in_memory" archiveMaintenance = "maintenance_in_archive" + completedQurantine = "quarantine_completed" + droppedQuarantine = "quarantine_dropped" + failedQuarantine = "quarantine_failed" // Op-types for chunkOps. createAndPin = "create" // A chunkDesc creation with refCount=1. diff --git a/storage/local/interface.go b/storage/local/interface.go index 5508d8c769..d9dbc4f213 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -73,7 +73,7 @@ type Storage interface { // methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of // a series, i.e. it is safe to continue using a SeriesIterator after or during // modifying the corresponding series, but the iterator will represent the state -// of the series prior the modification. +// of the series prior to the modification. type SeriesIterator interface { // Gets the value that is closest before the given time. In case a value // exists at precisely the given time, that value is returned. If no @@ -90,11 +90,11 @@ type Preloader interface { PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, - ) (SeriesIterator, error) + ) SeriesIterator PreloadInstant( fp model.Fingerprint, timestamp model.Time, stalenessDelta time.Duration, - ) (SeriesIterator, error) + ) SeriesIterator // Close unpins any previously requested series data from memory. Close() } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 22c16eccd2..acccc8874f 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -20,7 +20,6 @@ import ( "io" "io/ioutil" "os" - "path" "path/filepath" "strconv" "strings" @@ -46,6 +45,7 @@ const ( seriesFileSuffix = ".db" seriesTempFileSuffix = ".db.tmp" seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name. + hintFileSuffix = ".hint" headsFileName = "heads.db" headsTempFileName = "heads.db.tmp" @@ -321,8 +321,9 @@ func (p *persistence) isDirty() bool { // setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was // set to true with this method, it cannot be set to false again. (If we became // dirty during our runtime, there is no way back. If we were dirty from the -// start, a clean-up might make us clean again.) -func (p *persistence) setDirty(dirty bool) { +// start, a clean-up might make us clean again.) The provided error will be +// logged as a reason if dirty is true. +func (p *persistence) setDirty(dirty bool, err error) { if dirty { p.dirtyCounter.Inc() } @@ -334,7 +335,7 @@ func (p *persistence) setDirty(dirty bool) { p.dirty = dirty if dirty { p.becameDirty = true - log.Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") + log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") } } @@ -371,8 +372,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) { defer func() { if err != nil { - log.Error("Error persisting chunks: ", err) - p.setDirty(true) + p.setDirty(true, fmt.Errorf("error in method persistChunks: %s", err)) } }() @@ -441,8 +441,13 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse return nil, err } for c := 0; c < batchSize; c++ { - chunk := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) - chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]) + chunk, err := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) + if err != nil { + return nil, err + } + if err := chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil { + return nil, err + } chunks = append(chunks, chunk) } } @@ -470,7 +475,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ return nil, err } if fi.Size()%int64(chunkLenWithHeader) != 0 { - p.setDirty(true) + // The returned error will bubble up and lead to quarantining of the whole series. return nil, fmt.Errorf( "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", fp, fi.Size(), chunkLenWithHeader, @@ -648,7 +653,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { return } - if _, err = codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil { + lt, err := chunkDesc.lastTime() + if err != nil { + return + } + if _, err = codable.EncodeVarint(w, int64(lt)); err != nil { return } } else { @@ -854,7 +863,12 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in p.dirty = true return sm, chunksToPersist, nil } - chunk := newChunkForEncoding(chunkEncoding(encoding)) + chunk, err := newChunkForEncoding(chunkEncoding(encoding)) + if err != nil { + log.Warn("Problem with chunk encoding:", err) + p.dirty = true + return sm, chunksToPersist, nil + } if err := chunk.unmarshal(r); err != nil { log.Warn("Could not decode chunk:", err) p.dirty = true @@ -871,6 +885,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in } } + lt, err := chunkDescs[len(chunkDescs)-1].lastTime() + if err != nil { + log.Warn("Could not determine last time of head chunk:", err) + p.dirty = true + return sm, chunksToPersist, nil + } + fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{ metric: model.Metric(metric), chunkDescs: chunkDescs, @@ -878,7 +899,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in modTime: modTime, chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: model.Time(savedFirstTime), - lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), + lastTime: lt, headChunkClosed: headChunkClosed, } } @@ -908,8 +929,7 @@ func (p *persistence) dropAndPersistChunks( // please handle with care! defer func() { if err != nil { - log.Error("Error dropping and/or persisting chunks: ", err) - p.setDirty(true) + p.setDirty(true, fmt.Errorf("error in method dropAndPersistChunks: %s", err)) } }() @@ -918,7 +938,15 @@ func (p *persistence) dropAndPersistChunks( // too old. If that's the case, the chunks in the series file // are all too old, too. i := 0 - for ; i < len(chunks) && chunks[i].newIterator().lastTimestamp().Before(beforeTime); i++ { + for ; i < len(chunks); i++ { + var lt model.Time + lt, err = chunks[i].newIterator().lastTimestamp() + if err != nil { + return + } + if !lt.Before(beforeTime) { + break + } } if i < len(chunks) { firstTimeNotDropped = chunks[i].firstTime() @@ -1071,6 +1099,44 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) { return numChunks, nil } +// quarantineSeriesFile moves a series file to the orphaned directory. It also +// writes a hint file with the provided quarantine reason and, if series is +// non-nil, the string representation of the metric. +func (p *persistence) quarantineSeriesFile(fp model.Fingerprint, quarantineReason error, metric model.Metric) error { + var ( + oldName = p.fileNameForFingerprint(fp) + orphanedDir = filepath.Join(p.basePath, "orphaned", filepath.Base(filepath.Dir(oldName))) + newName = filepath.Join(orphanedDir, filepath.Base(oldName)) + hintName = newName[:len(newName)-len(seriesFileSuffix)] + hintFileSuffix + ) + + renameErr := os.MkdirAll(orphanedDir, 0700) + if renameErr != nil { + return renameErr + } + renameErr = os.Rename(oldName, newName) + if os.IsNotExist(renameErr) { + // Source file dosn't exist. That's normal. + renameErr = nil + } + // Write hint file even if the rename ended in an error. At least try... + // And ignore errors writing the hint file. It's best effort. + if f, err := os.Create(hintName); err == nil { + if metric != nil { + f.WriteString(metric.String() + "\n") + } else { + f.WriteString("[UNKNOWN METRIC]\n") + } + if quarantineReason != nil { + f.WriteString(quarantineReason.Error() + "\n") + } else { + f.WriteString("[UNKNOWN REASON]\n") + } + f.Close() + } + return renameErr +} + // seriesFileModTime returns the modification time of the series file belonging // to the provided fingerprint. In case of an error, the zero value of time.Time // is returned. @@ -1122,11 +1188,11 @@ func (p *persistence) archiveMetric( fp model.Fingerprint, m model.Metric, first, last model.Time, ) error { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { - p.setDirty(true) + p.setDirty(true, err) return err } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { - p.setDirty(true) + p.setDirty(true, err) return err } return nil @@ -1139,6 +1205,9 @@ func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( hasMetric bool, firstTime, lastTime model.Time, err error, ) { firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) + if err != nil { + p.setDirty(true, err) + } return } @@ -1187,7 +1256,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { defer func() { if err != nil { - p.setDirty(true) + p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err)) } }() @@ -1218,12 +1287,8 @@ func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { // was actually deleted, the method returns true and the first time and last // time of the deleted metric. The caller must have locked the fingerprint. func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) { - defer func() { - if err != nil { - p.setDirty(true) - } - }() - + // An error returned here will bubble up and lead to quarantining of the + // series, so no setDirty required. deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)) if err != nil || !deleted { return false, err @@ -1279,17 +1344,17 @@ func (p *persistence) close() error { func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:seriesDirNameLen]) + return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen]) } func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) + return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) } func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix) + return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix) } func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) { @@ -1322,19 +1387,19 @@ func (p *persistence) openChunkFileForReading(fp model.Fingerprint) (*os.File, e } func (p *persistence) headsFileName() string { - return path.Join(p.basePath, headsFileName) + return filepath.Join(p.basePath, headsFileName) } func (p *persistence) headsTempFileName() string { - return path.Join(p.basePath, headsTempFileName) + return filepath.Join(p.basePath, headsTempFileName) } func (p *persistence) mappingsFileName() string { - return path.Join(p.basePath, mappingsFileName) + return filepath.Join(p.basePath, mappingsFileName) } func (p *persistence) mappingsTempFileName() string { - return path.Join(p.basePath, mappingsTempFileName) + return filepath.Join(p.basePath, mappingsTempFileName) } func (p *persistence) processIndexingQueue() { @@ -1616,7 +1681,9 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error { b = b[:writeSize] for i, chunk := range chunks[:batchSize] { - writeChunkHeader(b[i*chunkLenWithHeader:], chunk) + if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil { + return err + } if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil { return err } @@ -1642,14 +1709,19 @@ func chunkIndexForOffset(offset int64) (int, error) { return int(offset) / chunkLenWithHeader, nil } -func writeChunkHeader(header []byte, c chunk) { +func writeChunkHeader(header []byte, c chunk) error { header[chunkHeaderTypeOffset] = byte(c.encoding()) binary.LittleEndian.PutUint64( header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime()), ) + lt, err := c.newIterator().lastTimestamp() + if err != nil { + return err + } binary.LittleEndian.PutUint64( header[chunkHeaderLastTimeOffset:], - uint64(c.newIterator().lastTimestamp()), + uint64(lt), ) + return nil } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 47cf9078a1..e1894032af 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -14,6 +14,10 @@ package local import ( + "bufio" + "errors" + "os" + "path/filepath" "reflect" "sync" "testing" @@ -49,7 +53,7 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes }) } -func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk { +func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint][]chunk { fps := model.Fingerprints{ m1.FastFingerprint(), m2.FastFingerprint(), @@ -60,10 +64,18 @@ func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk { for _, fp := range fps { fpToChunks[fp] = make([]chunk, 0, 10) for i := 0; i < 10; i++ { - fpToChunks[fp] = append(fpToChunks[fp], newChunkForEncoding(encoding).add(&model.SamplePair{ + ch, err := newChunkForEncoding(encoding) + if err != nil { + t.Fatal(err) + } + chs, err := ch.add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(fp), - })[0]) + }) + if err != nil { + t.Fatal(err) + } + fpToChunks[fp] = append(fpToChunks[fp], chs[0]) } } return fpToChunks @@ -73,7 +85,7 @@ func chunksEqual(c1, c2 chunk) bool { values2 := c2.newIterator().values() for v1 := range c1.newIterator().values() { v2 := <-values2 - if !v1.Equal(v2) { + if !(v1 == v2) { return false } } @@ -84,7 +96,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() - fpToChunks := buildTestChunks(encoding) + fpToChunks := buildTestChunks(t, encoding) for fp, chunks := range fpToChunks { firstTimeNotDropped, offset, numDropped, allDropped, err := @@ -126,10 +138,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10) } for i, cd := range actualChunkDescs { - if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) { + lastTime, err := cd.lastTime() + if err != nil { + t.Fatal(err) + } + if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { t.Errorf( "Want ts=%v, got firstTime=%v, lastTime=%v.", - i, cd.firstTime(), cd.lastTime(), + i, cd.firstTime(), lastTime, ) } @@ -140,10 +156,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5) } for i, cd := range actualChunkDescs { - if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) { + lastTime, err := cd.lastTime() + if err != nil { + t.Fatal(err) + } + if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { t.Errorf( "Want ts=%v, got firstTime=%v, lastTime=%v.", - i, cd.firstTime(), cd.lastTime(), + i, cd.firstTime(), lastTime, ) } @@ -433,21 +453,21 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding fpLocker := newFingerprintLocker(10) sm := newSeriesMap() - s1 := newMemorySeries(m1, nil, time.Time{}) - s2 := newMemorySeries(m2, nil, time.Time{}) - s3 := newMemorySeries(m3, nil, time.Time{}) - s4 := newMemorySeries(m4, nil, time.Time{}) - s5 := newMemorySeries(m5, nil, time.Time{}) - s1.add(&model.SamplePair{Timestamp: 1, Value: 3.14}) - s3.add(&model.SamplePair{Timestamp: 2, Value: 2.7}) + s1, _ := newMemorySeries(m1, nil, time.Time{}) + s2, _ := newMemorySeries(m2, nil, time.Time{}) + s3, _ := newMemorySeries(m3, nil, time.Time{}) + s4, _ := newMemorySeries(m4, nil, time.Time{}) + s5, _ := newMemorySeries(m5, nil, time.Time{}) + s1.add(model.SamplePair{Timestamp: 1, Value: 3.14}) + s3.add(model.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkClosed = true s3.persistWatermark = 1 for i := 0; i < 10000; i++ { - s4.add(&model.SamplePair{ + s4.add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(i) / 2, }) - s5.add(&model.SamplePair{ + s5.add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(i * i), }) @@ -552,10 +572,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding } continue } - if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + lastTime, err := cd.c.newIterator().lastTimestamp() + if err != nil { + t.Fatal(err) + } + if cd.chunkLastTime != lastTime { t.Errorf( "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", - i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + i, lastTime, cd.chunkLastTime, ) } } @@ -605,10 +629,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding } continue } - if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + lastTime, err := cd.c.newIterator().lastTimestamp() + if err != nil { + t.Fatal(err) + } + if cd.chunkLastTime != lastTime { t.Errorf( "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", - i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + i, cd.chunkLastTime, lastTime, ) } } @@ -1051,6 +1079,108 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet } } +func TestQuranatineSeriesFile(t *testing.T) { + p, closer := newTestPersistence(t, 1) + defer closer.Close() + + verify := func(fp model.Fingerprint, seriesFileShouldExist bool, contentHintFile ...string) { + var ( + fpStr = fp.String() + originalFile = p.fileNameForFingerprint(fp) + quarantinedFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) + hintFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+hintFileSuffix) + ) + if _, err := os.Stat(originalFile); !os.IsNotExist(err) { + t.Errorf("Expected file %q to not exist.", originalFile) + } + if _, err := os.Stat(quarantinedFile); (os.IsNotExist(err) && seriesFileShouldExist) || (err == nil && !seriesFileShouldExist) { + t.Errorf("Unexpected state of quarantined file %q. Expected it to exist: %t. os.Stat returned: %s.", quarantinedFile, seriesFileShouldExist, err) + } + f, err := os.Open(hintFile) + defer f.Close() + if err != nil { + t.Errorf("Could not open hint file %q: %s", hintFile, err) + return + } + scanner := bufio.NewScanner(f) + for _, want := range contentHintFile { + if !scanner.Scan() { + t.Errorf("Unexpected end of hint file %q.", hintFile) + return + } + got := scanner.Text() + if want != got { + t.Errorf("Want hint line %q, got %q.", want, got) + } + } + if scanner.Scan() { + t.Errorf("Unexpected spurious content in hint file %q: %q", hintFile, scanner.Text()) + } + } + + if err := p.quarantineSeriesFile(0, nil, nil); err != nil { + t.Error(err) + } + verify(0, false, "[UNKNOWN METRIC]", "[UNKNOWN REASON]") + + if err := p.quarantineSeriesFile( + 1, errors.New("file does not exist"), + nil, + ); err != nil { + t.Error(err) + } + verify(1, false, "[UNKNOWN METRIC]", "file does not exist") + + if err := p.quarantineSeriesFile( + 2, errors.New("file does not exist"), + model.Metric{"foo": "bar", "dings": "bums"}, + ); err != nil { + t.Error(err) + } + verify(2, false, `{dings="bums", foo="bar"}`, "file does not exist") + + if err := p.quarantineSeriesFile( + 3, nil, + model.Metric{"foo": "bar", "dings": "bums"}, + ); err != nil { + t.Error(err) + } + verify(3, false, `{dings="bums", foo="bar"}`, "[UNKNOWN REASON]") + + err := os.Mkdir(filepath.Join(p.basePath, "00"), os.ModePerm) + if err != nil { + t.Fatal(err) + } + f, err := os.Create(p.fileNameForFingerprint(4)) + if err != nil { + t.Fatal(err) + } + f.Close() + + if err := p.quarantineSeriesFile( + 4, errors.New("file exists"), + model.Metric{"sound": "cloud"}, + ); err != nil { + t.Error(err) + } + verify(4, true, `{sound="cloud"}`, "file exists") + + if err := p.quarantineSeriesFile(4, nil, nil); err != nil { + t.Error(err) + } + // Overwrites hint file but leaves series file intact. + verify(4, true, "[UNKNOWN METRIC]", "[UNKNOWN REASON]") + + if err := p.quarantineSeriesFile( + 4, errors.New("file exists"), + model.Metric{"sound": "cloud"}, + ); err != nil { + t.Error(err) + } + // Overwrites everything. + verify(4, true, `{sound="cloud"}`, "file exists") +} + var fpStrings = []string{ "b004b821ca50ba26", "b037c21e884e4fc5", diff --git a/storage/local/preload.go b/storage/local/preload.go index 65f1aac835..b0113bd6be 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -29,26 +29,20 @@ type memorySeriesPreloader struct { func (p *memorySeriesPreloader) PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, -) (SeriesIterator, error) { - cds, iter, err := p.storage.preloadChunksForRange(fp, from, through, false) - if err != nil { - return iter, err - } +) SeriesIterator { + cds, iter := p.storage.preloadChunksForRange(fp, from, through, false) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return iter, nil + return iter } // PreloadInstant implements Preloader func (p *memorySeriesPreloader) PreloadInstant( fp model.Fingerprint, timestamp model.Time, stalenessDelta time.Duration, -) (SeriesIterator, error) { - cds, iter, err := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true) - if err != nil { - return nil, err - } +) SeriesIterator { + cds, iter := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return iter, nil + return iter } // Close implements Preloader. diff --git a/storage/local/series.go b/storage/local/series.go index 7a027a8a5d..f76d5ee27b 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -191,12 +191,15 @@ type memorySeries struct { // set to model.Earliest. The zero value for modTime can be used if the // modification time of the series file is unknown (e.g. if this is a genuinely // new series). -func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries { +func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) (*memorySeries, error) { + var err error firstTime := model.Earliest lastTime := model.Earliest if len(chunkDescs) > 0 { firstTime = chunkDescs[0].firstTime() - lastTime = chunkDescs[len(chunkDescs)-1].lastTime() + if lastTime, err = chunkDescs[len(chunkDescs)-1].lastTime(); err != nil { + return nil, err + } } return &memorySeries{ metric: m, @@ -206,14 +209,14 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) lastTime: lastTime, persistWatermark: len(chunkDescs), modTime: modTime, - } + }, nil } // add adds a sample pair to the series. It returns the number of newly // completed chunks (which are now eligible for persistence). // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) add(v *model.SamplePair) int { +func (s *memorySeries) add(v model.SamplePair) (int, error) { if len(s.chunkDescs) == 0 || s.headChunkClosed { newHead := newChunkDesc(newChunk(), v.Timestamp) s.chunkDescs = append(s.chunkDescs, newHead) @@ -235,7 +238,10 @@ func (s *memorySeries) add(v *model.SamplePair) int { s.headChunkUsedByIterator = false } - chunks := s.head().add(v) + chunks, err := s.head().add(v) + if err != nil { + return 0, err + } s.head().c = chunks[0] for _, c := range chunks[1:] { @@ -250,7 +256,7 @@ func (s *memorySeries) add(v *model.SamplePair) int { s.lastTime = v.Timestamp s.lastSampleValue = v.Value s.lastSampleValueSet = true - return len(chunks) - 1 + return len(chunks) - 1, nil } // maybeCloseHeadChunk closes the head chunk if it has not been touched for the @@ -295,10 +301,14 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { // dropChunks removes chunkDescs older than t. The caller must have locked the // fingerprint of the series. -func (s *memorySeries) dropChunks(t model.Time) { +func (s *memorySeries) dropChunks(t model.Time) error { keepIdx := len(s.chunkDescs) for i, cd := range s.chunkDescs { - if !cd.lastTime().Before(t) { + lt, err := cd.lastTime() + if err != nil { + return err + } + if !lt.Before(t) { keepIdx = i break } @@ -318,6 +328,7 @@ func (s *memorySeries) dropChunks(t model.Time) { numMemChunkDescs.Sub(float64(keepIdx)) s.dirty = true } + return nil } // preloadChunks is an internal helper method. @@ -358,8 +369,12 @@ func (s *memorySeries) preloadChunks( s.headChunkUsedByIterator = true } + curriedQuarantineSeries := func(err error) { + mss.quarantineSeries(fp, s.metric, err) + } + iter := &boundedIterator{ - it: s.newIterator(pinnedChunkDescs), + it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries), start: model.Now().Add(-mss.dropAfter), } @@ -370,7 +385,7 @@ func (s *memorySeries) preloadChunks( // must be pinned). // // The caller must have locked the fingerprint of the memorySeries. -func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator { +func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine func(error)) SeriesIterator { chunks := make([]chunk, 0, len(pinnedChunkDescs)) for _, cd := range pinnedChunkDescs { // It's OK to directly access cd.c here (without locking) as the @@ -378,8 +393,9 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator chunks = append(chunks, cd.c) } return &memorySeriesIterator{ - chunks: chunks, - chunkIts: make([]chunkIterator, len(chunks)), + chunks: chunks, + chunkIts: make([]chunkIterator, len(chunks)), + quarantine: quarantine, } } @@ -437,7 +453,11 @@ func (s *memorySeries) preloadChunksForRange( if fromIdx == len(s.chunkDescs) { // Even the last chunk starts before "from". Find out if the // series ends before "from" and we don't need to do anything. - if s.chunkDescs[len(s.chunkDescs)-1].lastTime().Before(from) { + lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime() + if err != nil { + return nil, nopIter, err + } + if lt.Before(from) { return nil, nopIter, nil } } @@ -511,16 +531,29 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc { // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { - chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. - chunkIts []chunkIterator // Caches chunkIterators. - chunks []chunk + chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. + chunkIts []chunkIterator // Caches chunkIterators. + chunks []chunk + quarantine func(error) // Call to quarantine the series this iterator belongs to. } // ValueAtOrBeforeTime implements SeriesIterator. func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { // The most common case. We are iterating through a chunk. - if it.chunkIt != nil && it.chunkIt.contains(t) { - return it.chunkIt.valueAtOrBeforeTime(t) + if it.chunkIt != nil { + containsT, err := it.chunkIt.contains(t) + if err != nil { + it.quarantine(err) + return ZeroSamplePair + } + if containsT { + value, err := it.chunkIt.valueAtOrBeforeTime(t) + if err != nil { + it.quarantine(err) + return ZeroSamplePair + } + return value + } } if len(it.chunks) == 0 { @@ -537,7 +570,12 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return ZeroSamplePair } it.chunkIt = it.chunkIterator(l - i) - return it.chunkIt.valueAtOrBeforeTime(t) + value, err := it.chunkIt.valueAtOrBeforeTime(t) + if err != nil { + it.quarantine(err) + return ZeroSamplePair + } + return value } // RangeValues implements SeriesIterator. @@ -548,8 +586,15 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa }) // Only now check the last timestamp of the previous chunk (which is // fairly expensive). - if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) { - i-- + if i > 0 { + lt, err := it.chunkIterator(i - 1).lastTimestamp() + if err != nil { + it.quarantine(err) + return nil + } + if !lt.Before(in.OldestInclusive) { + i-- + } } values := []model.SamplePair{} @@ -557,7 +602,12 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa if c.firstTime().After(in.NewestInclusive) { break } - values = append(values, it.chunkIterator(i+j).rangeValues(in)...) + chValues, err := it.chunkIterator(i + j).rangeValues(in) + if err != nil { + it.quarantine(err) + return nil + } + values = append(values, chValues...) } return values } diff --git a/storage/local/storage.go b/storage/local/storage.go index f87f083ba3..2d99c073f3 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -30,8 +30,9 @@ import ( ) const ( - evictRequestsCap = 1024 - chunkLen = 1024 + evictRequestsCap = 1024 + quarantineRequestsCap = 1024 + chunkLen = 1024 // See waitForNextFP. fpMaxSweepTime = 6 * time.Hour @@ -77,6 +78,12 @@ type evictRequest struct { evict bool } +type quarantineRequest struct { + fp model.Fingerprint + metric model.Metric + reason error +} + // SyncStrategy is an enum to select a sync strategy for series files. type SyncStrategy int @@ -147,6 +154,9 @@ type memorySeriesStorage struct { evictRequests chan evictRequest evictStopping, evictStopped chan struct{} + quarantineRequests chan quarantineRequest + quarantineStopping, quarantineStopped chan struct{} + persistErrors prometheus.Counter numSeries prometheus.Gauge seriesOps *prometheus.CounterVec @@ -198,6 +208,10 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { evictStopping: make(chan struct{}), evictStopped: make(chan struct{}), + quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap), + quarantineStopping: make(chan struct{}), + quarantineStopped: make(chan struct{}), + persistErrors: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -312,6 +326,7 @@ func (s *memorySeriesStorage) Start() (err error) { } go s.handleEvictList() + go s.handleQuarantine() go s.logThrottling() go s.loop() @@ -326,6 +341,10 @@ func (s *memorySeriesStorage) Stop() error { close(s.loopStopping) <-s.loopStopped + log.Info("Stopping series quarantining...") + close(s.quarantineStopping) + <-s.quarantineStopped + log.Info("Stopping chunk eviction...") close(s.evictStopping) <-s.evictStopped @@ -521,22 +540,7 @@ func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric. // DropMetric implements Storage. func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { for _, fp := range fps { - s.fpLocker.Lock(fp) - - if series, ok := s.fpToSeries.get(fp); ok { - s.fpToSeries.del(fp) - s.numSeries.Dec() - s.persistence.unindexMetric(fp, series.metric) - } else if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log.Errorf("Error purging metric with fingerprint %v: %v", fp, err) - } - // Attempt to delete series file in any case. - if _, err := s.persistence.deleteSeriesFile(fp); err != nil { - log.Errorf("Error deleting series file for %v: %v", fp, err) - } - - s.fpLocker.Unlock(fp) - s.seriesOps.WithLabelValues(requestedPurge).Inc() + s.purgeSeries(fp, nil, nil) } } @@ -554,19 +558,24 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { rawFP := sample.Metric.FastFingerprint() s.fpLocker.Lock(rawFP) fp, err := s.mapper.mapFP(rawFP, sample.Metric) + defer func() { + s.fpLocker.Unlock(fp) + }() // Func wrapper because fp might change below. if err != nil { - log.Errorf("Error while mapping fingerprint %v: %v", rawFP, err) - s.persistence.setDirty(true) + s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) + return err } if fp != rawFP { // Switch locks. s.fpLocker.Unlock(rawFP) s.fpLocker.Lock(fp) } - series := s.getOrCreateSeries(fp, sample.Metric) + series, err := s.getOrCreateSeries(fp, sample.Metric) + if err != nil { + return err // getOrCreateSeries took care of quarantining already. + } if sample.Timestamp <= series.lastTime { - s.fpLocker.Unlock(fp) // Don't report "no-op appends", i.e. where timestamp and sample // value are the same as for the last append, as they are a // common occurrence when using client-side timestamps @@ -577,13 +586,16 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { return nil } s.outOfOrderSamplesCount.Inc() - return ErrOutOfOrderSample + return ErrOutOfOrderSample // Caused by the caller. } - completedChunksCount := series.add(&model.SamplePair{ + completedChunksCount, err := series.add(model.SamplePair{ Value: sample.Value, Timestamp: sample.Timestamp, }) - s.fpLocker.Unlock(fp) + if err != nil { + s.quarantineSeries(fp, sample.Metric, err) + return err + } s.ingestedSamplesCount.Inc() s.incNumChunksToPersist(completedChunksCount) @@ -644,7 +656,7 @@ func (s *memorySeriesStorage) logThrottling() { } } -func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries { +func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { series, ok := s.fpToSeries.get(fp) if !ok { var cds []*chunkDesc @@ -652,6 +664,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err) + return nil, err } if unarchived { s.seriesOps.WithLabelValues(unarchive).Inc() @@ -662,7 +675,8 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me // appear as archived or purged). cds, err = s.loadChunkDescs(fp, 0) if err != nil { - log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err) + s.quarantineSeries(fp, m, err) + return nil, err } modTime = s.persistence.seriesFileModTime(fp) } else { @@ -670,18 +684,22 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } - series = newMemorySeries(m, cds, modTime) + series, err = newMemorySeries(m, cds, modTime) + if err != nil { + s.quarantineSeries(fp, m, err) + return nil, err + } s.fpToSeries.put(fp, series) s.numSeries.Inc() } - return series + return series, nil } func (s *memorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, lastSampleOnly bool, -) ([]*chunkDesc, SeriesIterator, error) { +) ([]*chunkDesc, SeriesIterator) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) @@ -689,23 +707,34 @@ func (s *memorySeriesStorage) preloadChunksForRange( if !ok { has, first, last, err := s.persistence.hasArchivedMetric(fp) if err != nil { - return nil, nopIter, err + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil, nopIter } if !has { s.invalidPreloadRequestsCount.Inc() - return nil, nopIter, nil + return nil, nopIter } if from.Before(last) && through.After(first) { metric, err := s.persistence.archivedMetric(fp) if err != nil { - return nil, nopIter, err + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil, nopIter + } + series, err = s.getOrCreateSeries(fp, metric) + if err != nil { + log.With("fingerprint", fp).With("error", err).Error("Error while retrieving series.") + return nil, nopIter } - series = s.getOrCreateSeries(fp, metric) } else { - return nil, nopIter, nil + return nil, nopIter } } - return series.preloadChunksForRange(fp, from, through, lastSampleOnly, s) + cds, it, err := series.preloadChunksForRange(fp, from, through, lastSampleOnly, s) + if err != nil { + s.quarantineSeries(fp, series.metric, err) + return nil, nopIter + } + return cds, it } func (s *memorySeriesStorage) handleEvictList() { @@ -1121,7 +1150,10 @@ func (s *memorySeriesStorage) writeMemorySeries( s.persistErrors.Inc() return false } - series.dropChunks(beforeTime) + if err := series.dropChunks(beforeTime); err != nil { + s.persistErrors.Inc() + return false + } if len(series.chunkDescs) == 0 && allDroppedFromPersistence { // All chunks dropped from both memory and persistence. Delete the series for good. s.fpToSeries.del(fp) @@ -1136,8 +1168,7 @@ func (s *memorySeriesStorage) writeMemorySeries( } else { series.chunkDescsOffset -= numDroppedFromPersistence if series.chunkDescsOffset < 0 { - log.Errorf("Dropped more chunks from persistence than from memory for fingerprint %v, series %v.", fp, series) - s.persistence.setDirty(true) + s.persistence.setDirty(true, fmt.Errorf("dropped more chunks from persistence than from memory for fingerprint %v, series %v", fp, series)) series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery. } } @@ -1291,6 +1322,122 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { return score } +// quarantineSeries registers the provided fingerprint for quarantining. It +// always returns immediately. Quarantine requests are processed +// asynchronously. If there are too many requests queued, they are simply +// dropped. +// +// Quarantining means that the series file is moved to the orphaned directory, +// and all its traces are removed from indices. Call this method if an +// unrecoverable error is detected while dealing with a series, and pass in the +// encountered error. It will be saved as a hint in the orphaned directory. +func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) { + req := quarantineRequest{fp: fp, metric: metric, reason: err} + select { + case s.quarantineRequests <- req: + // Request submitted. + default: + log. + With("fingerprint", fp). + With("metric", metric). + With("reason", err). + Warn("Quarantine queue full. Dropped quarantine request.") + s.seriesOps.WithLabelValues(droppedQuarantine).Inc() + } +} + +func (s *memorySeriesStorage) handleQuarantine() { + for { + select { + case req := <-s.quarantineRequests: + s.purgeSeries(req.fp, req.metric, req.reason) + log. + With("fingerprint", req.fp). + With("metric", req.metric). + With("reason", req.reason). + Warn("Series quarantined.") + case <-s.quarantineStopping: + log.Info("Series quarantining stopped.") + close(s.quarantineStopped) + return + } + } + +} + +// purgeSeries removes all traces of a series. If a non-nil quarantine reason is +// provided, the series file will not be deleted completely, but moved to the +// orphaned directory with the reason and the metric in a hint file. The +// provided metric might be nil if unknown. +func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) { + s.fpLocker.Lock(fp) + + var ( + series *memorySeries + ok bool + ) + + if series, ok = s.fpToSeries.get(fp); ok { + s.fpToSeries.del(fp) + s.numSeries.Dec() + m = series.metric + + // Adjust s.numChunksToPersist and numMemChunks down by + // the number of chunks in this series that are not + // persisted yet. Persisted chunks will be deducted from + // numMemChunks upon eviction. + numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark + atomic.AddInt64(&numMemChunks, int64(-numChunksNotYetPersisted)) + if !series.headChunkClosed { + // Head chunk wasn't counted as waiting for persistence yet. + // (But it was counted as a chunk in memory.) + numChunksNotYetPersisted-- + } + s.incNumChunksToPersist(-numChunksNotYetPersisted) + + } else { + if err := s.persistence.purgeArchivedMetric(fp); err != nil { + log. + With("fingerprint", fp). + With("metric", m). + With("error", err). + Error("Error purging metric from archive.") + } + } + if m != nil { + // If we know a metric now, unindex it in any case. + // purgeArchivedMetric might have done so already, but we cannot + // be sure. Unindexing in idempotent, though. + s.persistence.unindexMetric(fp, m) + } + // Attempt to delete/quarantine the series file in any case. + if quarantineReason == nil { + // No reason stated, simply delete the file. + if _, err := s.persistence.deleteSeriesFile(fp); err != nil { + log. + With("fingerprint", fp). + With("metric", m). + With("error", err). + Error("Error deleting series file.") + } + s.seriesOps.WithLabelValues(requestedPurge).Inc() + } else { + if err := s.persistence.quarantineSeriesFile(fp, quarantineReason, m); err == nil { + s.seriesOps.WithLabelValues(completedQurantine).Inc() + } else { + s.seriesOps.WithLabelValues(failedQuarantine).Inc() + log. + With("fingerprint", fp). + With("metric", m). + With("reason", quarantineReason). + With("error", err). + Error("Error quarantining series file.") + } + } + + s.fpLocker.Unlock(fp) +} + // Describe implements prometheus.Collector. func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.persistence.Describe(ch) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 381f7c7a36..97fa450ba7 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -405,10 +405,7 @@ func TestRetentionCutoff(t *testing.T) { defer pl.Close() // Preload everything. - it, err := pl.PreloadRange(fp, insertStart, now) - if err != nil { - t.Fatalf("Error preloading outdated chunks: %s", err) - } + it := pl.PreloadRange(fp, insertStart, now) val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute)) if val.Timestamp != model.Earliest { @@ -492,18 +489,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - _, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -525,18 +516,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - _, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -549,6 +534,95 @@ func TestDropMetrics(t *testing.T) { } } +func TestQuarantineMetric(t *testing.T) { + now := model.Now() + insertStart := now.Add(-2 * time.Hour) + + s, closer := NewTestStorage(t, 1) + defer closer.Close() + + chunkFileExists := func(fp model.Fingerprint) (bool, error) { + f, err := s.persistence.openChunkFileForReading(fp) + if err == nil { + f.Close() + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"} + m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"} + m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"} + + N := 120000 + + for j, m := range []model.Metric{m1, m2, m3} { + for i := 0; i < N; i++ { + smpl := &model.Sample{ + Metric: m, + Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals. + Value: model.SampleValue(j), + } + s.Append(smpl) + } + } + s.WaitForIndexing() + + // Archive m3, but first maintain it so that at least something is written to disk. + fpToBeArchived := m3.FastFingerprint() + s.maintainMemorySeries(fpToBeArchived, 0) + s.fpLocker.Lock(fpToBeArchived) + s.fpToSeries.del(fpToBeArchived) + if err := s.persistence.archiveMetric( + fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), + ); err != nil { + t.Error(err) + } + s.fpLocker.Unlock(fpToBeArchived) + + // Corrupt the series file for m3. + f, err := os.Create(s.persistence.fileNameForFingerprint(fpToBeArchived)) + if err != nil { + t.Fatal(err) + } + if _, err := f.WriteString("This is clearly not the content of a series file."); err != nil { + t.Fatal(err) + } + if f.Close(); err != nil { + t.Fatal(err) + } + + fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) + if len(fps) != 3 { + t.Errorf("unexpected number of fingerprints: %d", len(fps)) + } + + pl := s.NewPreloader() + // This will access the corrupt file and lead to quarantining. + pl.PreloadInstant(fpToBeArchived, now.Add(-2*time.Hour), time.Minute) + pl.Close() + time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait. + s.WaitForIndexing() + + fps2 := s.fingerprintsForLabelPairs(model.LabelPair{ + Name: model.MetricNameLabel, Value: "test", + }) + if len(fps2) != 2 { + t.Errorf("unexpected number of fingerprints: %d", len(fps2)) + } + + exists, err := chunkFileExists(fpToBeArchived) + if err != nil { + t.Fatal(err) + } + if exists { + t.Errorf("chunk file exists for fp=%v", fpToBeArchived) + } +} + // TestLoop is just a smoke test for the loop method, if we can switch it on and // off without disaster. func TestLoop(t *testing.T) { @@ -619,7 +693,10 @@ func testChunk(t *testing.T, encoding chunkEncoding) { continue } for sample := range cd.c.newIterator().values() { - values = append(values, *sample) + if sample.error != nil { + t.Error(sample.error) + } + values = append(values, sample.SamplePair) } } @@ -662,10 +739,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) // #1 Exactly on a sample. for i, expected := range samples { @@ -739,10 +813,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - b.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) b.ResetTimer() @@ -820,10 +891,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) // #1 Zero length interval at sample. for i, expected := range samples { @@ -975,10 +1043,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - b.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) b.ResetTimer() @@ -1024,10 +1089,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1045,10 +1107,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - _, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1074,8 +1133,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Archive metrics. s.fpToSeries.del(fp) + lastTime, err := series.head().lastTime() + if err != nil { + t.Fatal(err) + } if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.head().lastTime(), + fp, series.metric, series.firstTime(), lastTime, ); err != nil { t.Fatal(err) } @@ -1125,8 +1188,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Archive metrics. s.fpToSeries.del(fp) + lastTime, err = series.head().lastTime() + if err != nil { + t.Fatal(err) + } if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.head().lastTime(), + fp, series.metric, series.firstTime(), lastTime, ); err != nil { t.Fatal(err) } @@ -1520,10 +1587,7 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples, t.Fatal(err) } p := s.NewPreloader() - it, err := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) - if err != nil { - t.Fatal(err) - } + it := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp) if found.Timestamp == model.Earliest { t.Errorf("Sample %#v: Expected sample not found.", sample) @@ -1567,10 +1631,7 @@ func TestAppendOutOfOrder(t *testing.T) { pl := s.NewPreloader() defer pl.Close() - it, err := pl.PreloadRange(fp, 0, 2) - if err != nil { - t.Fatalf("Error preloading chunks: %s", err) - } + it := pl.PreloadRange(fp, 0, 2) want := []model.SamplePair{ { From dad302144dd8e0c304f1b906cdfaa37c445074f9 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 15:06:00 +0100 Subject: [PATCH 3/3] Make a naked return less naked --- promql/functions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/promql/functions.go b/promql/functions.go index 0d3ec41951..433c06f023 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -548,7 +548,7 @@ func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slo slope = covXY / varX intercept = sumY/n - slope*sumX/n - return + return slope, intercept } // === deriv(node model.ValMatrix) Vector ===