From 89129cd39a16ceff6570d019132e887611954cb2 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 30 Jul 2020 16:41:13 +0530 Subject: [PATCH] Address comments Signed-off-by: Annanay --- scrape/scrape_test.go | 80 ++++++++++++++++----------------------- storage/remote/storage.go | 4 +- storage/remote/write.go | 3 +- tsdb/block_test.go | 2 +- tsdb/db.go | 2 +- tsdb/db_test.go | 2 +- tsdb/head.go | 3 +- tsdb/head_test.go | 70 +++++++++++++++++----------------- tsdb/tsdbblockutil.go | 2 +- 9 files changed, 78 insertions(+), 90 deletions(-) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index b8227910f8..d60c2d5ee1 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -300,7 +300,6 @@ func TestScrapePoolReload(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} - ctx := context.Background() sp, _ := newScrapePool(cfg, app, 0, nil) loop := sp.newLoop(scrapeLoopOptions{ @@ -309,7 +308,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok := loop.(*scrapeLoop) testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped := appl.appender(ctx) + wrapped := appl.appender(context.Background()) tl, ok := wrapped.(*timeLimitAppender) testutil.Assert(t, ok, "Expected timeLimitAppender but got %T", wrapped) @@ -324,7 +323,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok = loop.(*scrapeLoop) testutil.Assert(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped = appl.appender(ctx) + wrapped = appl.appender(context.Background()) sl, ok := wrapped.(*limitAppender) testutil.Assert(t, ok, "Expected limitAppender but got %T", wrapped) @@ -375,9 +374,8 @@ func TestScrapePoolRaces(t *testing.T) { func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), scraper, nil, nil, nopMutator, @@ -438,9 +436,8 @@ func TestScrapeLoopStop(t *testing.T) { scraper = &testScraper{} app = func(ctx context.Context) storage.Appender { return appender } ) - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), scraper, nil, nil, nopMutator, @@ -978,9 +975,8 @@ func TestScrapeLoopAppend(t *testing.T) { discoveryLabels := &Target{ labels: labels.FromStrings(test.discoveryLabels...), } - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil) @@ -996,7 +992,7 @@ func TestScrapeLoopAppend(t *testing.T) { now := time.Now() - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1024,9 +1020,8 @@ func TestScrapeLoopAppend(t *testing.T) { func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { // collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next. app := &collectResultAppender{} - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, nopMutator, nopMutator, @@ -1050,7 +1045,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { sl.cache.addRef(mets, fakeRef, lset, hash) now := time.Now() - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(metric), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1069,9 +1064,8 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { func TestScrapeLoopAppendSampleLimit(t *testing.T) { resApp := &collectResultAppender{} app := &limitAppender{Appender: resApp, limit: 1} - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, func(l labels.Labels) labels.Labels { if l.Has("deleteme") { @@ -1094,7 +1088,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { beforeMetricValue := beforeMetric.GetCounter().GetValue() now := time.Now() - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) if err != errSampleLimit { t.Fatalf("Did not see expected sample limit error: %s", err) @@ -1125,7 +1119,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { testutil.Equals(t, want, resApp.rolledbackResult, "Appended samples not as expected") now = time.Now() - slApp = sl.appender(ctx) + slApp = sl.appender(context.Background()) total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now) if err != errSampleLimit { t.Fatalf("Did not see expected sample limit error: %s", err) @@ -1144,9 +1138,8 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { defer s.Close() capp := &collectResultAppender{} - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, nopMutator, nopMutator, @@ -1157,12 +1150,12 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { ) now := time.Now() - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) - slApp = sl.appender(ctx) + slApp = sl.appender(context.Background()) _, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1185,9 +1178,8 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { func TestScrapeLoopAppendStaleness(t *testing.T) { app := &collectResultAppender{} - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, nopMutator, nopMutator, @@ -1198,12 +1190,12 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { ) now := time.Now() - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) - slApp = sl.appender(ctx) + slApp = sl.appender(context.Background()) _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1230,8 +1222,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { app := &collectResultAppender{} - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, nopMutator, nopMutator, @@ -1242,12 +1233,12 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { ) now := time.Now() - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) - slApp = sl.appender(ctx) + slApp = sl.appender(context.Background()) _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1342,9 +1333,8 @@ func (app *errorAppender) AddFast(ref uint64, t int64, v float64) error { func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { app := &errorAppender{} - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, nopMutator, @@ -1356,7 +1346,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T ) now := time.Unix(1, 0) - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1376,8 +1366,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { app := &collectResultAppender{} - ctx := context.Background() - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, nopMutator, @@ -1394,7 +1383,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { ) now := time.Now().Add(20 * time.Minute) - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1567,12 +1556,11 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { s := teststorage.New(t) defer s.Close() - ctx := context.Background() - app := s.Appender(ctx) + app := s.Appender(context.Background()) capp := &collectResultAppender{next: app} - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, nopMutator, nopMutator, @@ -1582,7 +1570,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { ) now := time.Now() - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1601,12 +1589,11 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { s := teststorage.New(t) defer s.Close() - ctx := context.Background() - app := s.Appender(ctx) + app := s.Appender(context.Background()) capp := &collectResultAppender{next: app} - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), nil, nil, nil, nopMutator, nopMutator, @@ -1616,7 +1603,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { ) now := time.Now() - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) testutil.Ok(t, err) testutil.Ok(t, slApp.Commit()) @@ -1678,11 +1665,10 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { s := teststorage.New(t) defer s.Close() - ctx := context.Background() - app := s.Appender(ctx) + app := s.Appender(context.Background()) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, + sl := newScrapeLoop(context.Background(), &testScraper{}, nil, nil, func(l labels.Labels) labels.Labels { @@ -1699,7 +1685,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { ) defer cancel() - slApp := sl.appender(ctx) + slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{}) testutil.NotOk(t, err) testutil.Ok(t, slApp.Rollback()) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 656272988e..a9d60b1961 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -169,8 +169,8 @@ func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.C } // Appender implements storage.Storage. -func (s *Storage) Appender(_ context.Context) storage.Appender { - return s.rws.Appender() +func (s *Storage) Appender(ctx context.Context) storage.Appender { + return s.rws.Appender(ctx) } // Close the background processing of the storage queues. diff --git a/storage/remote/write.go b/storage/remote/write.go index 28ea8aef10..77a0ab7ced 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -14,6 +14,7 @@ package remote import ( + "context" "fmt" "sync" "time" @@ -170,7 +171,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { } // Appender implements storage.Storage. -func (rws *WriteStorage) Appender() storage.Appender { +func (rws *WriteStorage) Appender(_ context.Context) storage.Appender { return ×tampTracker{ writeStorage: rws, } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 1e95353ee4..17f403244c 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -328,7 +328,7 @@ func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head { head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(tb, err) - app := head.Appender() + app := head.Appender(context.Background()) for _, s := range series { ref := uint64(0) it := s.Iterator() diff --git a/tsdb/db.go b/tsdb/db.go index 2dc91aad24..6687e3ad08 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -711,7 +711,7 @@ func (db *DB) run() { // Appender opens a new appender against the database. func (db *DB) Appender(ctx context.Context) storage.Appender { - return dbAppender{db: db, Appender: db.head.Appender()} + return dbAppender{db: db, Appender: db.head.Appender(ctx)} } // dbAppender wraps the DB's head appender and triggers compactions on commit diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 5e55840a70..c2ed46fbfb 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1203,7 +1203,7 @@ func TestSizeRetention(t *testing.T) { } // Add some data to the WAL. - headApp := db.Head().Appender() + headApp := db.Head().Appender(context.Background()) for _, m := range headBlocks { series := genSeries(100, 10, m.MinTime, m.MaxTime) for _, s := range series { diff --git a/tsdb/head.go b/tsdb/head.go index 066c722eaa..6cf43c74df 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "math" "path/filepath" @@ -1009,7 +1010,7 @@ func (a *initAppender) Rollback() error { } // Appender returns a new Appender on the database. -func (h *Head) Appender() storage.Appender { +func (h *Head) Appender(_ context.Context) storage.Appender { h.metrics.activeAppenders.Inc() // The head cache might not have a starting point yet. The init appender diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 64281cbb43..8b35dfaca8 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -267,14 +267,14 @@ func TestHead_WALMultiRef(t *testing.T) { testutil.Ok(t, head.Init(0)) - app := head.Appender() + app := head.Appender(context.Background()) ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) // Add another sample outside chunk range to mmap a chunk. - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), 1500, 2) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -282,14 +282,14 @@ func TestHead_WALMultiRef(t *testing.T) { testutil.Ok(t, head.Truncate(1600)) - app = head.Appender() + app = head.Appender(context.Background()) ref2, err := app.Add(labels.FromStrings("foo", "bar"), 1700, 3) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) // Add another sample outside chunk range to mmap a chunk. - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), 2000, 4) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -540,7 +540,7 @@ func TestHeadDeleteSimple(t *testing.T) { for _, c := range cases { head, w := newTestHead(t, 1000, compress) - app := head.Appender() + app := head.Appender(context.Background()) for _, smpl := range smplsAll { _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) @@ -554,7 +554,7 @@ func TestHeadDeleteSimple(t *testing.T) { } // Add more samples. - app = head.Appender() + app = head.Appender(context.Background()) for _, smpl := range c.addSamples { _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) @@ -621,7 +621,7 @@ func TestDeleteUntilCurMax(t *testing.T) { }() numSamples := int64(10) - app := hb.Appender() + app := hb.Appender(context.Background()) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() @@ -645,7 +645,7 @@ func TestDeleteUntilCurMax(t *testing.T) { testutil.Equals(t, 0, len(res.Warnings())) // Add again and test for presence. - app = hb.Appender() + app = hb.Appender(context.Background()) _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 11, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -671,7 +671,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { hb, w := newTestHead(t, int64(numSamples)*10, false) for i := 0; i < numSamples; i++ { - app := hb.Appender() + app := hb.Appender(context.Background()) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -763,7 +763,7 @@ func TestDelete_e2e(t *testing.T) { testutil.Ok(t, hb.Close()) }() - app := hb.Appender() + app := hb.Appender(context.Background()) for _, l := range lbls { ls := labels.New(l...) series := []tsdbutil.Sample{} @@ -1114,7 +1114,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { h.initTime(0) - app := h.appender() + app := h.Appender(context.Background()) lset := labels.FromStrings("a", "1") _, err := app.Add(lset, 2100, 1) testutil.Ok(t, err) @@ -1144,7 +1144,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { h.initTime(0) - app := h.appender() + app := h.Appender(context.Background()) lset := labels.FromStrings("a", "1") _, err := app.Add(lset, 2100, 1) testutil.Ok(t, err) @@ -1175,7 +1175,7 @@ func TestHead_LogRollback(t *testing.T) { testutil.Ok(t, h.Close()) }() - app := h.Appender() + app := h.Appender(context.Background()) _, err := app.Add(labels.FromStrings("a", "b"), 1, 2) testutil.Ok(t, err) @@ -1373,7 +1373,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { testutil.Ok(t, h.Close()) }() add := func(ts int64) { - app := h.Appender() + app := h.Appender(context.Background()) _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1404,7 +1404,7 @@ func TestAddDuplicateLabelName(t *testing.T) { }() add := func(labels labels.Labels, labelName string) { - app := h.Appender() + app := h.Appender(context.Background()) _, err := app.Add(labels, 0, 0) testutil.NotOk(t, err) testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error()) @@ -1458,7 +1458,7 @@ func TestMemSeriesIsolation(t *testing.T) { if h.MinTime() == math.MaxInt64 { app = &initAppender{head: h} } else { - a := h.appender() + a := h.Appender(context.Background()) a.cleanupAppendIDsBelow = 0 app = a } @@ -1489,7 +1489,7 @@ func TestMemSeriesIsolation(t *testing.T) { testutil.Equals(t, 999, lastValue(hb, 999)) // Cleanup appendIDs below 500. - app := hb.appender() + app := hb.Appender(context.Background()) app.cleanupAppendIDsBelow = 500 _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) @@ -1508,7 +1508,7 @@ func TestMemSeriesIsolation(t *testing.T) { // Cleanup appendIDs below 1000, which means the sample buffer is // the only thing with appendIDs. - app = hb.appender() + app = hb.Appender(context.Background()) app.cleanupAppendIDsBelow = 1000 _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) @@ -1522,7 +1522,7 @@ func TestMemSeriesIsolation(t *testing.T) { i++ // Cleanup appendIDs below 1001, but with a rollback. - app = hb.appender() + app = hb.Appender(context.Background()) app.cleanupAppendIDsBelow = 1001 _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) @@ -1556,7 +1556,7 @@ func TestMemSeriesIsolation(t *testing.T) { // Cleanup appendIDs below 1000, which means the sample buffer is // the only thing with appendIDs. - app = hb.appender() + app = hb.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) i++ testutil.Ok(t, err) @@ -1569,7 +1569,7 @@ func TestMemSeriesIsolation(t *testing.T) { testutil.Equals(t, 1001, lastValue(hb, 1003)) // Cleanup appendIDs below 1002, but with a rollback. - app = hb.appender() + app = hb.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) @@ -1587,13 +1587,13 @@ func TestIsolationRollback(t *testing.T) { testutil.Ok(t, hb.Close()) }() - app := hb.Appender() + app := hb.Appender(context.Background()) _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, uint64(1), hb.iso.lowWatermark()) - app = hb.Appender() + app = hb.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1) testutil.Ok(t, err) _, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2) @@ -1601,7 +1601,7 @@ func TestIsolationRollback(t *testing.T) { testutil.Ok(t, app.Rollback()) testutil.Equals(t, uint64(2), hb.iso.lowWatermark()) - app = hb.Appender() + app = hb.Appender(context.Background()) _, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1614,18 +1614,18 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { testutil.Ok(t, hb.Close()) }() - app1 := hb.Appender() + app1 := hb.Appender(context.Background()) _, err := app1.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) testutil.Ok(t, app1.Commit()) testutil.Equals(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.") - app1 = hb.Appender() + app1 = hb.Appender(context.Background()) _, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1) testutil.Ok(t, err) testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not committed yet.") - app2 := hb.Appender() + app2 := hb.Appender(context.Background()) _, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1) testutil.Ok(t, err) testutil.Ok(t, app2.Commit()) @@ -1668,10 +1668,10 @@ func TestIsolationWithoutAdd(t *testing.T) { testutil.Ok(t, hb.Close()) }() - app := hb.Appender() + app := hb.Appender(context.Background()) testutil.Ok(t, app.Commit()) - app = hb.Appender() + app = hb.Appender(context.Background()) _, err := app.Add(labels.FromStrings("foo", "baz"), 1, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1767,7 +1767,7 @@ func testHeadSeriesChunkRace(t *testing.T) { testutil.Ok(t, h.Close()) }() testutil.Ok(t, h.Init(0)) - app := h.Appender() + app := h.Appender(context.Background()) s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0) testutil.Ok(t, err) @@ -1816,7 +1816,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { expectedLabelValues = []string{"d", "e", "f"} ) - app := head.Appender() + app := head.Appender(context.Background()) for i, name := range expectedLabelNames { _, err := app.Add(labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0) testutil.Ok(t, err) @@ -1861,28 +1861,28 @@ func TestErrReuseAppender(t *testing.T) { testutil.Ok(t, head.Close()) }() - app := head.Appender() + app := head.Appender(context.Background()) _, err := app.Add(labels.Labels{{Name: "test", Value: "test"}}, 0, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.NotOk(t, app.Commit()) testutil.NotOk(t, app.Rollback()) - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 1, 0) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) testutil.NotOk(t, app.Rollback()) testutil.NotOk(t, app.Commit()) - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 2, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.NotOk(t, app.Rollback()) testutil.NotOk(t, app.Commit()) - app = head.Appender() + app = head.Appender(context.Background()) _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 3, 0) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 2264abc754..d152594dac 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -38,7 +38,7 @@ func CreateHead(samples []*MetricSample, chunkRange int64, chunkDir string, logg if err != nil { return nil, err } - app := head.Appender() + app := head.Appender(context.TODO()) for _, sample := range samples { _, err = app.Add(sample.Labels, sample.TimestampMs, sample.Value) if err != nil {