Merge pull request #1 from lippserd/ha

HA improvements
This commit is contained in:
Eric Lippmann 2021-03-16 10:12:23 +01:00 committed by GitHub
commit dfd904b064
3 changed files with 69 additions and 54 deletions

View file

@ -18,7 +18,6 @@ import (
func main() {
cmd := command.New()
instanceId := cmd.InstanceId()
logger := cmd.Logger
defer logger.Sync()
defer func() {
@ -39,7 +38,7 @@ func main() {
ctx := context.Background()
heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger)
ha := icingadb.NewHA(ctx, instanceId, db, heartbeat, logger)
ha := icingadb.NewHA(ctx, db, heartbeat, logger)
s := icingadb.NewSync(db, rc, logger)
// For temporary exit after sync

View file

@ -1,14 +1,11 @@
package command
import (
"fmt"
"github.com/icinga/icingadb/pkg/config"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"path/filepath"
)
type Command struct {
@ -50,20 +47,6 @@ func (c Command) Database() *icingadb.DB {
return db
}
func (c Command) InstanceId() types.Binary {
var instanceId types.Binary
path := filepath.Join(c.Flags.Datadir, "instance-id")
instanceId, err := utils.CreateOrRead(path, utils.Uuid)
if err != nil {
c.Logger.Fatalw(fmt.Sprintf("can't create or read instance-id file %s", path), zap.Error(err))
}
instanceId = utils.Checksum([]byte(instanceId))
c.Logger.Infof("My instance ID is %s", instanceId)
return instanceId
}
func (c Command) Redis() *icingaredis.Client {
rc, err := c.Config.Redis.NewClient(c.Logger)
if err != nil {

View file

@ -3,7 +3,9 @@ package icingadb
import (
"context"
"database/sql"
"encoding/hex"
"errors"
"github.com/google/uuid"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
@ -32,13 +34,15 @@ type HA struct {
errOnce sync.Once
}
func NewHA(ctx context.Context, instanceId types.Binary, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA {
func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA {
ctx, cancel := context.WithCancel(ctx)
instanceId := uuid.New()
ha := &HA{
ctx: ctx,
cancel: cancel,
instanceId: instanceId,
instanceId: instanceId[:],
db: db,
heartbeat: heartbeat,
logger: logger,
@ -94,6 +98,10 @@ func (h *HA) abort(err error) {
// controller loop.
func (h *HA) controller() {
h.logger.Debugw("Starting HA", zap.String("instance_id", hex.EncodeToString(h.instanceId)))
oldInstancesRemoved := false
for {
select {
case b, ok := <-h.heartbeat.Beat():
@ -126,6 +134,10 @@ func (h *HA) controller() {
if err = h.realize(s, t); err != nil {
h.abort(err)
}
if !oldInstancesRemoved {
go h.removeOldInstances(s)
oldInstancesRemoved = true
}
case <-h.heartbeat.Lost():
h.logger.Error("Lost heartbeat")
h.signalHandover()
@ -137,7 +149,7 @@ func (h *HA) controller() {
func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error {
// boff := backoff.NewExponentialWithJitter(time.Millisecond*1, time.Second*1)
for attempt, retry := 0, true; retry; attempt++ {
for attempt := 0; true; attempt++ {
// sleep := boff(uint64(attempt))
// h.logger.Debugf("Sleeping for %s..", sleep)
// time.Sleep(sleep)
@ -158,50 +170,71 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error {
break
}
_ = rows.Close()
if takeover {
i := v1.IcingadbInstance{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{
Id: h.instanceId,
},
i := v1.IcingadbInstance{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{
Id: h.instanceId,
},
EnvironmentMeta: v1.EnvironmentMeta{
EnvironmentId: s.EnvironmentID(),
},
Heartbeat: *t,
Responsible: types.Yes,
Icinga2Version: s.Version,
Icinga2StartTime: s.ProgramStart,
Icinga2NotificationsEnabled: s.NotificationsEnabled,
Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled,
Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled,
Icinga2EventHandlersEnabled: s.EventHandlersEnabled,
Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled,
Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled,
}
_, err := tx.NamedExecContext(ctx, h.db.BuildUpsertStmt(i), i)
if err != nil {
cancel()
if !utils.IsDeadlock(err) {
retry = false
h.logger.Errorw("Can't Update or insert instance.", zap.Error(err))
} else {
h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err))
}
},
EnvironmentMeta: v1.EnvironmentMeta{
EnvironmentId: s.EnvironmentID(),
},
Heartbeat: *t,
Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true},
EndpointId: s.EndpointId,
Icinga2Version: s.Version,
Icinga2StartTime: s.ProgramStart,
Icinga2NotificationsEnabled: s.NotificationsEnabled,
Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled,
Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled,
Icinga2EventHandlersEnabled: s.EventHandlersEnabled,
Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled,
Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled,
}
_, err = tx.NamedExecContext(ctx, h.db.BuildUpsertStmt(i), i)
if err != nil {
cancel()
if !utils.IsDeadlock(err) {
h.logger.Errorw("Can't Update or insert instance.", zap.Error(err))
break
} else {
h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err))
continue
}
} else {
retry = false
}
if err := tx.Commit(); err != nil {
return err
}
h.signalTakeover()
if takeover {
h.signalTakeover()
}
break
}
return nil
}
func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) {
select {
case <-h.ctx.Done():
return
case <-time.After(timeout):
result, err := h.db.ExecContext(h.ctx, "DELETE FROM icingadb_instance "+
"WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?",
h.instanceId, s.EnvironmentID(), s.EndpointId, types.UnixMilli(time.Now().Add(-timeout)))
if err != nil {
h.logger.Errorw("Can't remove rows of old instances", zap.Error(err))
return
}
affected, err := result.RowsAffected()
if err != nil {
h.logger.Errorw("Can't get number of removed old instances", zap.Error(err))
return
}
h.logger.Debugf("Removed %d old instances", affected)
}
}
func (h *HA) signalHandover() {
if h.responsible {
h.logger.Warn("Handing over..")