mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Introduce func icingaredis.Client.YieldAll()
This commit is contained in:
parent
961a3650e8
commit
51d2532f18
1 changed files with 24 additions and 0 deletions
|
|
@ -4,10 +4,13 @@ import (
|
|||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/common"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
|
@ -161,3 +164,24 @@ func (c *Client) StreamLastId(ctx context.Context, stream string) (string, error
|
|||
|
||||
return lastId, nil
|
||||
}
|
||||
|
||||
// YieldAll yields all entities from Redis that belong to the specified SyncSubject.
|
||||
func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) {
|
||||
key := utils.Key(utils.Name(subject.Entity()), ':')
|
||||
if subject.WithChecksum() {
|
||||
key = "icinga:checksum:" + key
|
||||
} else {
|
||||
key = "icinga:config:" + key
|
||||
}
|
||||
|
||||
pairs, errs := c.HYield(ctx, key, 1<<12)
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
// Let errors from HYield cancel the group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
|
||||
desired, errs := CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU())
|
||||
// Let errors from CreateEntities cancel the group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
|
||||
return desired, com.WaitAsync(g)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue