diff --git a/redis.go b/redis.go index 63c57506..d71700b6 100644 --- a/redis.go +++ b/redis.go @@ -362,6 +362,11 @@ type ConfigChunk struct { Checksums []interface{} } +type ChecksumChunk struct { + Keys []string + Checksums []interface{} +} + func (rdbw *RDBWrapper) PipeConfigChunks(done <-chan struct{}, keys []string, objectType string) <-chan *ConfigChunk { out := make(chan *ConfigChunk) @@ -396,7 +401,7 @@ func (rdbw *RDBWrapper) PipeConfigChunks(done <-chan struct{}, keys []string, ob } //TODO: Replace fixed chunkSize - work := icingadb_utils.ChunkKeys(done, keys, 1500) + work := icingadb_utils.ChunkKeys(done, keys, 500) go func() { defer close(out) @@ -416,3 +421,46 @@ func (rdbw *RDBWrapper) PipeConfigChunks(done <-chan struct{}, keys []string, ob return out } + +func (rdbw *RDBWrapper) PipeChecksumChunks(done <-chan struct{}, keys []string, objectType string) <-chan *ChecksumChunk { + out := make(chan *ChecksumChunk) + + worker := func(chunk <-chan []string) { + for k := range chunk { + cmd := rdbw.HMGet(fmt.Sprintf("icinga:config:checksum:%s", objectType), k...) + + checksums, err := cmd.Result() + if err != nil { + panic(err) + } + + select { + case out <- &ChecksumChunk{Keys: k, Checksums: checksums}: + case <-done: + return + } + } + } + + //TODO: Replace fixed chunkSize + work := icingadb_utils.ChunkKeys(done, keys, 500) + + go func() { + defer close(out) + + wg := &sync.WaitGroup{} + + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + defer wg.Done() + worker(work) + }() + } + + wg.Wait() + }() + + return out +} +