icingadb/pkg/icingaredis/utils.go

119 lines
3.2 KiB
Go
Raw Permalink Normal View History

2021-03-03 15:48:44 -05:00
package icingaredis
import (
"context"
2024-05-22 05:47:44 -04:00
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icinga-go-library/strcase"
"github.com/icinga/icinga-go-library/types"
2024-05-21 05:58:08 -04:00
"github.com/icinga/icingadb/pkg/common"
2021-03-03 15:48:44 -05:00
"github.com/icinga/icingadb/pkg/contracts"
2021-05-18 07:35:44 -04:00
"github.com/pkg/errors"
2021-03-03 15:48:44 -05:00
"golang.org/x/sync/errgroup"
2024-05-21 05:58:08 -04:00
"runtime"
2021-03-03 15:48:44 -05:00
)
2021-06-22 08:53:33 -04:00
// CreateEntities streams and creates entities from the
// given Redis field value pairs using the specified factory function,
// and streams them on a returned channel.
2024-05-21 05:58:08 -04:00
func CreateEntities(ctx context.Context, factoryFunc database.EntityFactoryFunc, pairs <-chan redis.HPair, concurrent int) (<-chan database.Entity, <-chan error) {
entities := make(chan database.Entity)
2021-03-03 15:48:44 -05:00
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer close(entities)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < concurrent; i++ {
g.Go(func() error {
for pair := range pairs {
var id types.Binary
if err := id.UnmarshalText([]byte(pair.Field)); err != nil {
2021-06-01 04:49:35 -04:00
return errors.Wrapf(err, "can't create ID from value %#v", pair.Field)
2021-03-03 15:48:44 -05:00
}
e := factoryFunc()
if err := types.UnmarshalJSON([]byte(pair.Value), e); err != nil {
2021-06-01 04:49:35 -04:00
return err
2021-03-03 15:48:44 -05:00
}
e.SetID(id)
select {
case entities <- e:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
return g.Wait()
})
return entities, com.WaitAsync(g)
}
2021-06-22 08:53:33 -04:00
// SetChecksums concurrently streams from the given entities and
// sets their checksums using the specified map and
// streams the results on a returned channel.
func SetChecksums(ctx context.Context, entities <-chan database.Entity, checksums map[string]database.Entity, concurrent int) (<-chan database.Entity, <-chan error) {
entitiesWithChecksum := make(chan database.Entity)
2021-03-03 15:48:44 -05:00
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer close(entitiesWithChecksum)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < concurrent; i++ {
g.Go(func() error {
for entity := range entities {
2021-04-08 12:12:05 -04:00
if checksumer, ok := checksums[entity.ID().String()]; ok {
2021-03-03 15:48:44 -05:00
entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum())
} else {
2021-06-14 06:43:53 -04:00
return errors.Errorf("no checksum for %#v", entity)
2021-03-03 15:48:44 -05:00
}
select {
case entitiesWithChecksum <- entity:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
return g.Wait()
})
return entitiesWithChecksum, com.WaitAsync(g)
}
2021-05-18 07:35:44 -04:00
2024-05-21 05:58:08 -04:00
// YieldAll yields all entities from Redis that belong to the specified SyncSubject.
func YieldAll(ctx context.Context, c *redis.Client, subject *common.SyncSubject) (<-chan database.Entity, <-chan error) {
key := strcase.Delimited(types.Name(subject.Entity()), ':')
if subject.WithChecksum() {
key = "icinga:checksum:" + key
} else {
key = "icinga:" + key
2021-05-18 07:35:44 -04:00
}
2024-05-21 05:58:08 -04:00
pairs, errs := c.HYield(ctx, key)
g, ctx := errgroup.WithContext(ctx)
// Let errors from HYield cancel the group.
com.ErrgroupReceive(g, errs)
desired, errs := CreateEntities(ctx, subject.FactoryForDelta(), pairs, runtime.NumCPU())
// Let errors from CreateEntities cancel the group.
com.ErrgroupReceive(g, errs)
return desired, com.WaitAsync(g)
2021-05-18 07:35:44 -04:00
}