From 51d2532f188dd3abd898cd5dcd8885b1e5d57d52 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 11:36:55 +0200 Subject: [PATCH] Introduce func icingaredis.Client.YieldAll() --- pkg/icingaredis/client.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 777de056..cb04962d 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -4,10 +4,13 @@ import ( "context" "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/utils" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" + "runtime" "time" ) @@ -161,3 +164,24 @@ func (c *Client) StreamLastId(ctx context.Context, stream string) (string, error return lastId, nil } + +// YieldAll yields all entities from Redis that belong to the specified SyncSubject. +func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) { + key := utils.Key(utils.Name(subject.Entity()), ':') + if subject.WithChecksum() { + key = "icinga:checksum:" + key + } else { + key = "icinga:config:" + key + } + + pairs, errs := c.HYield(ctx, key, 1<<12) + g, ctx := errgroup.WithContext(ctx) + // Let errors from HYield cancel the group. + com.ErrgroupReceive(g, errs) + + desired, errs := CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel the group. + com.ErrgroupReceive(g, errs) + + return desired, com.WaitAsync(g) +}