Add PipeChecksumChunks()

This commit is contained in:
Noah Hilverling 2019-03-15 15:59:41 +01:00
parent 8fd8758620
commit 588b339dc4

View file

@ -362,6 +362,11 @@ type ConfigChunk struct {
Checksums []interface{}
}
type ChecksumChunk struct {
Keys []string
Checksums []interface{}
}
func (rdbw *RDBWrapper) PipeConfigChunks(done <-chan struct{}, keys []string, objectType string) <-chan *ConfigChunk {
out := make(chan *ConfigChunk)
@ -396,7 +401,7 @@ func (rdbw *RDBWrapper) PipeConfigChunks(done <-chan struct{}, keys []string, ob
}
//TODO: Replace fixed chunkSize
work := icingadb_utils.ChunkKeys(done, keys, 1500)
work := icingadb_utils.ChunkKeys(done, keys, 500)
go func() {
defer close(out)
@ -416,3 +421,46 @@ func (rdbw *RDBWrapper) PipeConfigChunks(done <-chan struct{}, keys []string, ob
return out
}
func (rdbw *RDBWrapper) PipeChecksumChunks(done <-chan struct{}, keys []string, objectType string) <-chan *ChecksumChunk {
out := make(chan *ChecksumChunk)
worker := func(chunk <-chan []string) {
for k := range chunk {
cmd := rdbw.HMGet(fmt.Sprintf("icinga:config:checksum:%s", objectType), k...)
checksums, err := cmd.Result()
if err != nil {
panic(err)
}
select {
case out <- &ChecksumChunk{Keys: k, Checksums: checksums}:
case <-done:
return
}
}
}
//TODO: Replace fixed chunkSize
work := icingadb_utils.ChunkKeys(done, keys, 500)
go func() {
defer close(out)
wg := &sync.WaitGroup{}
for i := 0; i < 32; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(work)
}()
}
wg.Wait()
}()
return out
}