diff --git a/go.mod b/go.mod index 6a307f71..6634625a 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/jessevdk/go-flags v1.5.0 github.com/jmoiron/sqlx v1.3.4 github.com/pkg/errors v0.9.1 + github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.19.1 golang.org/x/exp v0.0.0-20210514180818-737f94c0881e golang.org/x/sync v0.0.0-20210220032951-036812b2e83c diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 59c9eaab..b5bdcf2c 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -2,13 +2,10 @@ package icingadb import ( "context" - "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/utils" "go.uber.org/zap" - "golang.org/x/sync/errgroup" - "sync" "time" ) @@ -22,7 +19,8 @@ type Delta struct { logger *zap.SugaredLogger } -// NewDelta creates a new Delta and starts calculating it. +// NewDelta creates a new Delta and starts calculating it. The caller must ensure +// that no duplicate entities are sent to the same stream. func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta { delta := &Delta{ Subject: subject, @@ -43,98 +41,82 @@ func (delta *Delta) Wait() error { func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) { defer close(delta.done) + start := time.Now() + var endActual, endDesired time.Time + var numActual, numDesired uint64 + + actual := EntitiesById{} // only read from actualCh (so far) + desired := EntitiesById{} // only read from desiredCh (so far) + var update EntitiesById if delta.Subject.WithChecksum() { - update = EntitiesById{} + update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums } - actual := EntitiesById{} - desired := EntitiesById{} - var mtx, updateMtx sync.Mutex - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - var cnt com.Counter - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - delta.logger.Debugf( - "Synced %d actual elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed) - }) - for { - select { - case a, ok := <-actualCh: - if !ok { - return nil - } - - id := a.ID().String() - mtx.Lock() - - if d, ok := desired[id]; ok { - delete(desired, id) - mtx.Unlock() - - if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { - updateMtx.Lock() - update[id] = d - updateMtx.Unlock() - } - } else { - actual[id] = a - mtx.Unlock() - } - - cnt.Inc() - case <-ctx.Done(): - return ctx.Err() + for actualCh != nil || desiredCh != nil { + select { + case actualValue, ok := <-actualCh: + if !ok { + endActual = time.Now() + actualCh = nil // Done reading all actual entities, disable this case. + break } - } - }) + numActual++ - g.Go(func() error { - var cnt com.Counter - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - delta.logger.Debugf( - "Synced %d desired elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed) - }) - for { - select { - case d, ok := <-desiredCh: - if !ok { - return nil + id := actualValue.ID().String() + if desiredValue, ok := desired[id]; ok { + delete(desired, id) + if update != nil && !checksumsMatch(actualValue, desiredValue) { + update[id] = desiredValue } - - id := d.ID().String() - mtx.Lock() - - if a, ok := actual[id]; ok { - delete(actual, id) - mtx.Unlock() - - if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { - updateMtx.Lock() - update[id] = d - updateMtx.Unlock() - } - } else { - desired[id] = d - mtx.Unlock() - } - - cnt.Inc() - case <-ctx.Done(): - return ctx.Err() + } else { + actual[id] = actualValue } + + case desiredValue, ok := <-desiredCh: + if !ok { + endDesired = time.Now() + desiredCh = nil // Done reading all desired entities, disable this case. + break + } + numDesired++ + + id := desiredValue.ID().String() + if actualValue, ok := actual[id]; ok { + delete(actual, id) + if update != nil && !checksumsMatch(actualValue, desiredValue) { + update[id] = desiredValue + } + } else { + desired[id] = desiredValue + } + + case <-ctx.Done(): + delta.done <- ctx.Err() + return } - }) - - if err := g.Wait(); err != nil { - delta.done <- err - - return } delta.Create = desired + delta.Update = update delta.Delete = actual - if delta.Subject.WithChecksum() { - delta.Update = update - } + + delta.logger.Debugw("Delta finished", + zap.String("subject", utils.Name(delta.Subject.Entity())), + zap.Duration("time_total", time.Since(start)), + zap.Duration("time_actual", endActual.Sub(start)), + zap.Duration("time_desired", endDesired.Sub(start)), + zap.Uint64("num_actual", numActual), + zap.Uint64("num_desired", numDesired), + zap.Int("create", len(delta.Create)), + zap.Int("update", len(delta.Update)), + zap.Int("delete", len(delta.Delete))) +} + +// checksumsMatch returns whether the checksums of two entities are the same. +// Both entities must implement contracts.Checksumer. +func checksumsMatch(a, b contracts.Entity) bool { + c1 := a.(contracts.Checksumer).Checksum() + c2 := b.(contracts.Checksumer).Checksum() + return c1.Equal(c2) } diff --git a/pkg/icingadb/delta_test.go b/pkg/icingadb/delta_test.go new file mode 100644 index 00000000..9f1cdf88 --- /dev/null +++ b/pkg/icingadb/delta_test.go @@ -0,0 +1,266 @@ +package icingadb + +import ( + "context" + "encoding/binary" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "strconv" + "sync" + "testing" +) + +func TestDelta(t *testing.T) { + type TestData struct { + Name string // name for the sub-test + Actual, Desired uint64 // checksum to send to actual/desired + Create, Update, Delete uint64 // checksum that must be in the corresponding map (if != 0) + } + + tests := []TestData{{ + Name: "Empty", + }, { + Name: "Create", + Desired: 0x1111111111111111, + Create: 0x1111111111111111, + }, { + Name: "Update", + Actual: 0x1111111111111111, + Desired: 0x2222222222222222, + Update: 0x2222222222222222, + }, { + Name: "Delete", + Actual: 0x1111111111111111, + Delete: 0x1111111111111111, + }, { + Name: "Keep", + Actual: 0x1111111111111111, + Desired: 0x1111111111111111, + }} + + makeEndpoint := func(id, checksum uint64) *v1.Endpoint { + e := new(v1.Endpoint) + e.Id = testDeltaMakeIdOrChecksum(id) + e.PropertiesChecksum = testDeltaMakeIdOrChecksum(checksum) + return e + } + + // Send the entities to the actual and desired channels in different ordering to catch bugs in the implementation + // that only show depending on the order in which actual and desired values are processed for an ID. + type SendOrder struct { + Name string + Send func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) + } + sendOrders := []SendOrder{{ + Name: "ActualFirst", + Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) { + if test.Actual != 0 { + chActual <- makeEndpoint(id, test.Actual) + } + if test.Desired != 0 { + chDesired <- makeEndpoint(id, test.Desired) + } + }, + }, { + Name: "DesiredFirst", + Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) { + if test.Desired != 0 { + chDesired <- makeEndpoint(id, test.Desired) + } + if test.Actual != 0 { + chActual <- makeEndpoint(id, test.Actual) + } + }, + }} + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + for _, sendOrder := range sendOrders { + t.Run(sendOrder.Name, func(t *testing.T) { + id := uint64(0x42) + chActual := make(chan contracts.Entity) + chDesired := make(chan contracts.Entity) + subject := common.NewSyncSubject(v1.NewEndpoint) + logger := zaptest.NewLogger(t).Sugar() + + go func() { + sendOrder.Send(id, test, chActual, chDesired) + close(chActual) + close(chDesired) + }() + + delta := NewDelta(context.Background(), chActual, chDesired, subject, logger) + err := delta.Wait() + require.NoError(t, err, "delta should finish without error") + + _, ok := <-chActual + require.False(t, ok, "chActual should have been closed") + _, ok = <-chDesired + require.False(t, ok, "chDesired should have been closed") + + testDeltaVerifyResult(t, "Create", testDeltaMakeExpectedMap(id, test.Create), delta.Create) + testDeltaVerifyResult(t, "Update", testDeltaMakeExpectedMap(id, test.Update), delta.Update) + testDeltaVerifyResult(t, "Delete", testDeltaMakeExpectedMap(id, test.Delete), delta.Delete) + }) + } + }) + } + + t.Run("Combined", func(t *testing.T) { + chActual := make(chan contracts.Entity) + chDesired := make(chan contracts.Entity) + subject := common.NewSyncSubject(v1.NewEndpoint) + logger := zaptest.NewLogger(t).Sugar() + + expectedCreate := make(map[uint64]uint64) + expectedUpdate := make(map[uint64]uint64) + expectedDelete := make(map[uint64]uint64) + + nextId := uint64(1) + var wg sync.WaitGroup + for _, test := range tests { + test := test + for _, sendOrder := range sendOrders { + sendOrder := sendOrder + id := nextId + nextId++ + // Log ID mapping to allow easier debugging in case of failures. + t.Logf("ID=%d(%s) Test=%s SendOrder=%s", + id, testDeltaMakeIdOrChecksum(id).String(), test.Name, sendOrder.Name) + wg.Add(1) + go func() { + defer wg.Done() + sendOrder.Send(id, test, chActual, chDesired) + }() + + if test.Create != 0 { + expectedCreate[id] = test.Create + } + if test.Update != 0 { + expectedUpdate[id] = test.Update + } + if test.Delete != 0 { + expectedDelete[id] = test.Delete + } + } + } + go func() { + wg.Wait() + close(chActual) + close(chDesired) + }() + + delta := NewDelta(context.Background(), chActual, chDesired, subject, logger) + err := delta.Wait() + require.NoError(t, err, "delta should finish without error") + + _, ok := <-chActual + require.False(t, ok, "chActual should have been closed") + _, ok = <-chDesired + require.False(t, ok, "chDesired should have been closed") + + testDeltaVerifyResult(t, "Create", expectedCreate, delta.Create) + testDeltaVerifyResult(t, "Update", expectedUpdate, delta.Update) + testDeltaVerifyResult(t, "Delete", expectedDelete, delta.Delete) + }) +} + +func testDeltaMakeIdOrChecksum(i uint64) types.Binary { + b := make([]byte, 20) + binary.BigEndian.PutUint64(b, i) + return b +} + +func testDeltaMakeExpectedMap(id uint64, checksum uint64) map[uint64]uint64 { + if checksum == 0 { + return nil + } else { + return map[uint64]uint64{ + id: checksum, + } + } +} + +func testDeltaVerifyResult(t *testing.T, name string, expected map[uint64]uint64, got EntitiesById) { + for id, checksum := range expected { + idKey := testDeltaMakeIdOrChecksum(id).String() + if assert.Containsf(t, got, idKey, "%s: should contain %s", name, idKey) { + expectedChecksum := testDeltaMakeIdOrChecksum(checksum).String() + gotChecksum := got[idKey].(contracts.Checksumer).Checksum().String() + assert.Equalf(t, expectedChecksum, gotChecksum, "%s: %s should match checksum", name, idKey) + delete(got, idKey) + } + } + + for id := range got { + assert.Failf(t, "unexpected element", "%s: should not contain %s", name, id) + } +} + +func BenchmarkDelta(b *testing.B) { + for n := 1 << 10; n <= 1<<20; n <<= 1 { + b.Run(strconv.Itoa(n), func(b *testing.B) { + benchmarkDelta(b, n) + }) + } +} + +func benchmarkDelta(b *testing.B, numEntities int) { + chActual := make([]chan contracts.Entity, b.N) + chDesired := make([]chan contracts.Entity, b.N) + for i := 0; i < b.N; i++ { + chActual[i] = make(chan contracts.Entity, numEntities) + chDesired[i] = make(chan contracts.Entity, numEntities) + } + makeEndpoint := func(id1, id2, checksum uint64) *v1.Endpoint { + e := new(v1.Endpoint) + e.Id = make([]byte, 20) + binary.BigEndian.PutUint64(e.Id[0:], id1) + binary.BigEndian.PutUint64(e.Id[8:], id2) + e.PropertiesChecksum = make([]byte, 20) + binary.BigEndian.PutUint64(e.PropertiesChecksum, checksum) + return e + } + for i := 0; i < numEntities; i++ { + // each iteration writes exactly one entity to each channel + var eActual, eDesired contracts.Entity + switch i % 3 { + case 0: // distinct IDs + eActual = makeEndpoint(1, uint64(i), uint64(i)) + eDesired = makeEndpoint(2, uint64(i), uint64(i)) + case 1: // same ID, same checksum + e := makeEndpoint(3, uint64(i), uint64(i)) + eActual = e + eDesired = e + case 2: // same ID, different checksum + eActual = makeEndpoint(4, uint64(i), uint64(i)) + eDesired = makeEndpoint(4, uint64(i), uint64(i+1)) + } + for _, ch := range chActual { + ch <- eActual + } + for _, ch := range chDesired { + ch <- eDesired + } + } + for i := 0; i < b.N; i++ { + close(chActual[i]) + close(chDesired[i]) + } + subject := common.NewSyncSubject(v1.NewEndpoint) + // logger := zaptest.NewLogger(b).Sugar() + logger := zap.New(zapcore.NewTee()).Sugar() + b.ResetTimer() + for i := 0; i < b.N; i++ { + d := NewDelta(context.Background(), chActual[i], chDesired[i], subject, logger) + err := d.Wait() + assert.NoError(b, err, "delta should not fail") + } +}