Merge pull request #373 from Icinga/feature/single-threaded-delta

Rewrite delta to use only a single goroutine
This commit is contained in:
Julian Brost 2021-09-24 16:53:23 +02:00 committed by GitHub
commit 6e3df7d63b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 334 additions and 85 deletions

1
go.mod
View file

@ -10,6 +10,7 @@ require (
github.com/jessevdk/go-flags v1.5.0
github.com/jmoiron/sqlx v1.3.4
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/exp v0.0.0-20210514180818-737f94c0881e
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c

View file

@ -2,13 +2,10 @@ package icingadb
import (
"context"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"sync"
"time"
)
@ -22,7 +19,8 @@ type Delta struct {
logger *zap.SugaredLogger
}
// NewDelta creates a new Delta and starts calculating it.
// NewDelta creates a new Delta and starts calculating it. The caller must ensure
// that no duplicate entities are sent to the same stream.
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta {
delta := &Delta{
Subject: subject,
@ -43,98 +41,82 @@ func (delta *Delta) Wait() error {
func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
defer close(delta.done)
start := time.Now()
var endActual, endDesired time.Time
var numActual, numDesired uint64
actual := EntitiesById{} // only read from actualCh (so far)
desired := EntitiesById{} // only read from desiredCh (so far)
var update EntitiesById
if delta.Subject.WithChecksum() {
update = EntitiesById{}
update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums
}
actual := EntitiesById{}
desired := EntitiesById{}
var mtx, updateMtx sync.Mutex
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
var cnt com.Counter
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
delta.logger.Debugf(
"Synced %d actual elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed)
})
for {
select {
case a, ok := <-actualCh:
if !ok {
return nil
}
id := a.ID().String()
mtx.Lock()
if d, ok := desired[id]; ok {
delete(desired, id)
mtx.Unlock()
if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
updateMtx.Lock()
update[id] = d
updateMtx.Unlock()
}
} else {
actual[id] = a
mtx.Unlock()
}
cnt.Inc()
case <-ctx.Done():
return ctx.Err()
for actualCh != nil || desiredCh != nil {
select {
case actualValue, ok := <-actualCh:
if !ok {
endActual = time.Now()
actualCh = nil // Done reading all actual entities, disable this case.
break
}
}
})
numActual++
g.Go(func() error {
var cnt com.Counter
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
delta.logger.Debugf(
"Synced %d desired elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed)
})
for {
select {
case d, ok := <-desiredCh:
if !ok {
return nil
id := actualValue.ID().String()
if desiredValue, ok := desired[id]; ok {
delete(desired, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
update[id] = desiredValue
}
id := d.ID().String()
mtx.Lock()
if a, ok := actual[id]; ok {
delete(actual, id)
mtx.Unlock()
if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
updateMtx.Lock()
update[id] = d
updateMtx.Unlock()
}
} else {
desired[id] = d
mtx.Unlock()
}
cnt.Inc()
case <-ctx.Done():
return ctx.Err()
} else {
actual[id] = actualValue
}
case desiredValue, ok := <-desiredCh:
if !ok {
endDesired = time.Now()
desiredCh = nil // Done reading all desired entities, disable this case.
break
}
numDesired++
id := desiredValue.ID().String()
if actualValue, ok := actual[id]; ok {
delete(actual, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
update[id] = desiredValue
}
} else {
desired[id] = desiredValue
}
case <-ctx.Done():
delta.done <- ctx.Err()
return
}
})
if err := g.Wait(); err != nil {
delta.done <- err
return
}
delta.Create = desired
delta.Update = update
delta.Delete = actual
if delta.Subject.WithChecksum() {
delta.Update = update
}
delta.logger.Debugw("Delta finished",
zap.String("subject", utils.Name(delta.Subject.Entity())),
zap.Duration("time_total", time.Since(start)),
zap.Duration("time_actual", endActual.Sub(start)),
zap.Duration("time_desired", endDesired.Sub(start)),
zap.Uint64("num_actual", numActual),
zap.Uint64("num_desired", numDesired),
zap.Int("create", len(delta.Create)),
zap.Int("update", len(delta.Update)),
zap.Int("delete", len(delta.Delete)))
}
// checksumsMatch returns whether the checksums of two entities are the same.
// Both entities must implement contracts.Checksumer.
func checksumsMatch(a, b contracts.Entity) bool {
c1 := a.(contracts.Checksumer).Checksum()
c2 := b.(contracts.Checksumer).Checksum()
return c1.Equal(c2)
}

266
pkg/icingadb/delta_test.go Normal file
View file

@ -0,0 +1,266 @@
package icingadb
import (
"context"
"encoding/binary"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"strconv"
"sync"
"testing"
)
func TestDelta(t *testing.T) {
type TestData struct {
Name string // name for the sub-test
Actual, Desired uint64 // checksum to send to actual/desired
Create, Update, Delete uint64 // checksum that must be in the corresponding map (if != 0)
}
tests := []TestData{{
Name: "Empty",
}, {
Name: "Create",
Desired: 0x1111111111111111,
Create: 0x1111111111111111,
}, {
Name: "Update",
Actual: 0x1111111111111111,
Desired: 0x2222222222222222,
Update: 0x2222222222222222,
}, {
Name: "Delete",
Actual: 0x1111111111111111,
Delete: 0x1111111111111111,
}, {
Name: "Keep",
Actual: 0x1111111111111111,
Desired: 0x1111111111111111,
}}
makeEndpoint := func(id, checksum uint64) *v1.Endpoint {
e := new(v1.Endpoint)
e.Id = testDeltaMakeIdOrChecksum(id)
e.PropertiesChecksum = testDeltaMakeIdOrChecksum(checksum)
return e
}
// Send the entities to the actual and desired channels in different ordering to catch bugs in the implementation
// that only show depending on the order in which actual and desired values are processed for an ID.
type SendOrder struct {
Name string
Send func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity)
}
sendOrders := []SendOrder{{
Name: "ActualFirst",
Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) {
if test.Actual != 0 {
chActual <- makeEndpoint(id, test.Actual)
}
if test.Desired != 0 {
chDesired <- makeEndpoint(id, test.Desired)
}
},
}, {
Name: "DesiredFirst",
Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) {
if test.Desired != 0 {
chDesired <- makeEndpoint(id, test.Desired)
}
if test.Actual != 0 {
chActual <- makeEndpoint(id, test.Actual)
}
},
}}
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
for _, sendOrder := range sendOrders {
t.Run(sendOrder.Name, func(t *testing.T) {
id := uint64(0x42)
chActual := make(chan contracts.Entity)
chDesired := make(chan contracts.Entity)
subject := common.NewSyncSubject(v1.NewEndpoint)
logger := zaptest.NewLogger(t).Sugar()
go func() {
sendOrder.Send(id, test, chActual, chDesired)
close(chActual)
close(chDesired)
}()
delta := NewDelta(context.Background(), chActual, chDesired, subject, logger)
err := delta.Wait()
require.NoError(t, err, "delta should finish without error")
_, ok := <-chActual
require.False(t, ok, "chActual should have been closed")
_, ok = <-chDesired
require.False(t, ok, "chDesired should have been closed")
testDeltaVerifyResult(t, "Create", testDeltaMakeExpectedMap(id, test.Create), delta.Create)
testDeltaVerifyResult(t, "Update", testDeltaMakeExpectedMap(id, test.Update), delta.Update)
testDeltaVerifyResult(t, "Delete", testDeltaMakeExpectedMap(id, test.Delete), delta.Delete)
})
}
})
}
t.Run("Combined", func(t *testing.T) {
chActual := make(chan contracts.Entity)
chDesired := make(chan contracts.Entity)
subject := common.NewSyncSubject(v1.NewEndpoint)
logger := zaptest.NewLogger(t).Sugar()
expectedCreate := make(map[uint64]uint64)
expectedUpdate := make(map[uint64]uint64)
expectedDelete := make(map[uint64]uint64)
nextId := uint64(1)
var wg sync.WaitGroup
for _, test := range tests {
test := test
for _, sendOrder := range sendOrders {
sendOrder := sendOrder
id := nextId
nextId++
// Log ID mapping to allow easier debugging in case of failures.
t.Logf("ID=%d(%s) Test=%s SendOrder=%s",
id, testDeltaMakeIdOrChecksum(id).String(), test.Name, sendOrder.Name)
wg.Add(1)
go func() {
defer wg.Done()
sendOrder.Send(id, test, chActual, chDesired)
}()
if test.Create != 0 {
expectedCreate[id] = test.Create
}
if test.Update != 0 {
expectedUpdate[id] = test.Update
}
if test.Delete != 0 {
expectedDelete[id] = test.Delete
}
}
}
go func() {
wg.Wait()
close(chActual)
close(chDesired)
}()
delta := NewDelta(context.Background(), chActual, chDesired, subject, logger)
err := delta.Wait()
require.NoError(t, err, "delta should finish without error")
_, ok := <-chActual
require.False(t, ok, "chActual should have been closed")
_, ok = <-chDesired
require.False(t, ok, "chDesired should have been closed")
testDeltaVerifyResult(t, "Create", expectedCreate, delta.Create)
testDeltaVerifyResult(t, "Update", expectedUpdate, delta.Update)
testDeltaVerifyResult(t, "Delete", expectedDelete, delta.Delete)
})
}
func testDeltaMakeIdOrChecksum(i uint64) types.Binary {
b := make([]byte, 20)
binary.BigEndian.PutUint64(b, i)
return b
}
func testDeltaMakeExpectedMap(id uint64, checksum uint64) map[uint64]uint64 {
if checksum == 0 {
return nil
} else {
return map[uint64]uint64{
id: checksum,
}
}
}
func testDeltaVerifyResult(t *testing.T, name string, expected map[uint64]uint64, got EntitiesById) {
for id, checksum := range expected {
idKey := testDeltaMakeIdOrChecksum(id).String()
if assert.Containsf(t, got, idKey, "%s: should contain %s", name, idKey) {
expectedChecksum := testDeltaMakeIdOrChecksum(checksum).String()
gotChecksum := got[idKey].(contracts.Checksumer).Checksum().String()
assert.Equalf(t, expectedChecksum, gotChecksum, "%s: %s should match checksum", name, idKey)
delete(got, idKey)
}
}
for id := range got {
assert.Failf(t, "unexpected element", "%s: should not contain %s", name, id)
}
}
func BenchmarkDelta(b *testing.B) {
for n := 1 << 10; n <= 1<<20; n <<= 1 {
b.Run(strconv.Itoa(n), func(b *testing.B) {
benchmarkDelta(b, n)
})
}
}
func benchmarkDelta(b *testing.B, numEntities int) {
chActual := make([]chan contracts.Entity, b.N)
chDesired := make([]chan contracts.Entity, b.N)
for i := 0; i < b.N; i++ {
chActual[i] = make(chan contracts.Entity, numEntities)
chDesired[i] = make(chan contracts.Entity, numEntities)
}
makeEndpoint := func(id1, id2, checksum uint64) *v1.Endpoint {
e := new(v1.Endpoint)
e.Id = make([]byte, 20)
binary.BigEndian.PutUint64(e.Id[0:], id1)
binary.BigEndian.PutUint64(e.Id[8:], id2)
e.PropertiesChecksum = make([]byte, 20)
binary.BigEndian.PutUint64(e.PropertiesChecksum, checksum)
return e
}
for i := 0; i < numEntities; i++ {
// each iteration writes exactly one entity to each channel
var eActual, eDesired contracts.Entity
switch i % 3 {
case 0: // distinct IDs
eActual = makeEndpoint(1, uint64(i), uint64(i))
eDesired = makeEndpoint(2, uint64(i), uint64(i))
case 1: // same ID, same checksum
e := makeEndpoint(3, uint64(i), uint64(i))
eActual = e
eDesired = e
case 2: // same ID, different checksum
eActual = makeEndpoint(4, uint64(i), uint64(i))
eDesired = makeEndpoint(4, uint64(i), uint64(i+1))
}
for _, ch := range chActual {
ch <- eActual
}
for _, ch := range chDesired {
ch <- eDesired
}
}
for i := 0; i < b.N; i++ {
close(chActual[i])
close(chDesired[i])
}
subject := common.NewSyncSubject(v1.NewEndpoint)
// logger := zaptest.NewLogger(b).Sugar()
logger := zap.New(zapcore.NewTee()).Sugar()
b.ResetTimer()
for i := 0; i < b.N; i++ {
d := NewDelta(context.Background(), chActual[i], chDesired[i], subject, logger)
err := d.Wait()
assert.NoError(b, err, "delta should not fail")
}
}