From cfa0253ed8ca449d803fc34aa4461f49c2de0db9 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 26 Jan 2018 14:57:33 +0100 Subject: [PATCH 1/4] discovery: Schedule updates to throttle --- discovery/manager.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/discovery/manager.go b/discovery/manager.go index ae10de5cb8..dd1e9b5335 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -81,6 +82,10 @@ type Manager struct { targets map[poolKey]map[string]*targetgroup.Group // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group + // True if updates were received in the last 5 seconds. + recentlyUpdated bool + // Protects recentlyUpdated. + recentlyUpdatedMtx sync.Mutex } // Run starts the background processing @@ -123,6 +128,7 @@ func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Dis go worker.Run(ctx, updates) go m.runProvider(ctx, poolKey, updates) + go m.runUpdater(ctx) } func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) { @@ -137,7 +143,26 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan return } m.updateGroup(poolKey, tgs) - m.syncCh <- m.allGroups() + m.recentlyUpdatedMtx.Lock() + m.recentlyUpdated = true + m.recentlyUpdatedMtx.Unlock() + } + } +} + +func (m *Manager) runUpdater(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.recentlyUpdatedMtx.Lock() + if m.recentlyUpdated { + m.syncCh <- m.allGroups() + m.recentlyUpdated = false + } + m.recentlyUpdatedMtx.Unlock() } } } From 46693444de096181b124a38dbe177cdf97844226 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 26 Jan 2018 16:05:39 +0100 Subject: [PATCH 2/4] vendor: Update github.com/prometheus/tsdb --- vendor/github.com/prometheus/tsdb/db.go | 6 +++++- vendor/vendor.json | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index b659165f17..292b2a8337 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -299,7 +299,11 @@ func (db *DB) retentionCutoff() (bool, error) { } // This will close the dirs and then delete the dirs. - return len(dirs) > 0, db.reload(dirs...) + if len(dirs) > 0 { + return true, db.reload(dirs...) + } + + return false, nil } // Appender opens a new appender against the database. diff --git a/vendor/vendor.json b/vendor/vendor.json index ca61908964..0e35516de3 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -800,10 +800,10 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "errrSfk16j274D3QY61bWODd56c=", + "checksumSHA1": "5mCM640B2xa7y+kRRUeVCglEk7o=", "path": "github.com/prometheus/tsdb", - "revision": "ad0fdaf436fc99828bce60f81984dcfa3a282a44", - "revisionTime": "2018-01-24T14:58:35Z" + "revision": "44dd5e1202b7598d50c69ce3617ca6ae6503cf52", + "revisionTime": "2018-01-26T14:54:38Z" }, { "checksumSHA1": "XTirmk6Pq5TBGIZEaN5VL4k3i1s=", From fe926e7829d03af92dd7251d0daa4461c7ac38cf Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 27 Jan 2018 12:03:06 +0000 Subject: [PATCH 3/4] update the discover tests the discovery test is now only testing update and get groups. It doesn't do an e2e test but just a unit test of setting and receiving target groups --- discovery/manager_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/discovery/manager_test.go b/discovery/manager_test.go index cf2b7e81e4..2eb86605e8 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -29,8 +29,8 @@ import ( "gopkg.in/yaml.v2" ) -// TestDiscoveryManagerSyncCalls checks that the target updates are received in the expected order. -func TestDiscoveryManagerSyncCalls(t *testing.T) { +// TestTargetUpdatesOrder checks that the target updates are received in the expected order. +func TestTargetUpdatesOrder(t *testing.T) { // The order by which the updates are send is detirmened by the interval passed to the mock discovery adapter // Final targets array is ordered alphabetically by the name of the discoverer. @@ -656,15 +656,14 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() discoveryManager := NewManager(nil) - go discoveryManager.Run(ctx) var totalUpdatesCount int - for tpName, update := range testCase.updates { - provider := newMockDiscoveryProvider(update) - discoveryManager.startProvider(ctx, poolKey{setName: strconv.Itoa(testIndex), provider: tpName}, provider) - if len(update) > 0 { - totalUpdatesCount = totalUpdatesCount + len(update) + provUpdates := make(chan []*targetgroup.Group) + for _, up := range testCase.updates { + go newMockDiscoveryProvider(up).Run(ctx, provUpdates) + if len(up) > 0 { + totalUpdatesCount = totalUpdatesCount + len(up) } } @@ -674,9 +673,10 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { case <-time.After(10 * time.Second): t.Errorf("%v. %q: no update arrived within the timeout limit", x, testCase.title) break Loop - case tsetMap := <-discoveryManager.SyncCh(): - for _, received := range tsetMap { - // Need to sort by the Groups source as the Discovery manager doesn't guarantee the order. + case tgs := <-provUpdates: + discoveryManager.updateGroup(poolKey{setName: strconv.Itoa(testIndex), provider: testCase.title}, tgs) + for _, received := range discoveryManager.allGroups() { + // Need to sort by the Groups source as the received order is not guaranteed. sort.Sort(byGroupSource(received)) if !reflect.DeepEqual(received, testCase.expectedTargets[x]) { var receivedFormated string From 73e829137bac7547f25434e8c234b617ff3c50fa Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Mon, 29 Jan 2018 13:51:04 +0100 Subject: [PATCH 4/4] discovery: Cleanup ticker --- discovery/manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/discovery/manager.go b/discovery/manager.go index dd1e9b5335..c21e32d5a7 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -152,6 +152,8 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan func (m *Manager) runUpdater(ctx context.Context) { ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { select { case <-ctx.Done():