mirror of
https://github.com/Icinga/icingadb.git
synced 2026-06-06 15:22:08 -04:00
Restart HA after environment change
If the environment changes during runtime, we have to restart HA in order to stop a possibly running config sync and start a new one.
This commit is contained in:
parent
e74b09daa1
commit
084e0409bc
2 changed files with 49 additions and 16 deletions
|
|
@ -230,6 +230,10 @@ func run() int {
|
|||
case <-ha.Handover():
|
||||
logger.Warn("Handing over")
|
||||
|
||||
cancelHactx()
|
||||
case <-ha.Restart():
|
||||
logger.Info("Restarting HA")
|
||||
|
||||
cancelHactx()
|
||||
case <-hactx.Done():
|
||||
// Nothing to do here, surrounding loop will terminate now.
|
||||
|
|
|
|||
|
|
@ -26,11 +26,13 @@ type HA struct {
|
|||
cancelCtx context.CancelFunc
|
||||
instanceId types.Binary
|
||||
db *DB
|
||||
environment *v1.Environment
|
||||
heartbeat *icingaredis.Heartbeat
|
||||
logger *zap.SugaredLogger
|
||||
responsible bool
|
||||
handover chan struct{}
|
||||
takeover chan struct{}
|
||||
restart chan struct{}
|
||||
done chan struct{}
|
||||
mu *sync.Mutex
|
||||
err error
|
||||
|
|
@ -52,6 +54,7 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger
|
|||
logger: logger,
|
||||
handover: make(chan struct{}),
|
||||
takeover: make(chan struct{}),
|
||||
restart: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
mu: &sync.Mutex{},
|
||||
}
|
||||
|
|
@ -96,6 +99,11 @@ func (h *HA) Takeover() chan struct{} {
|
|||
return h.takeover
|
||||
}
|
||||
|
||||
// Restart returns a channel with which restarts are signaled.
|
||||
func (h *HA) Restart() chan struct{} {
|
||||
return h.restart
|
||||
}
|
||||
|
||||
func (h *HA) abort(err error) {
|
||||
h.errOnce.Do(func() {
|
||||
h.mu.Lock()
|
||||
|
|
@ -141,6 +149,30 @@ func (h *HA) controller() {
|
|||
h.abort(err)
|
||||
}
|
||||
|
||||
if h.environment == nil || h.environment.Name.String != s.Environment {
|
||||
var restart bool
|
||||
|
||||
if h.environment != nil {
|
||||
h.logger.Warnw("Got new environment", zap.String("current", h.environment.Name.String), zap.String("new", s.Environment))
|
||||
restart = true
|
||||
}
|
||||
|
||||
h.environment = &v1.Environment{
|
||||
EntityWithoutChecksum: v1.EntityWithoutChecksum{IdMeta: v1.IdMeta{
|
||||
Id: s.EnvironmentID(),
|
||||
}},
|
||||
Name: types.String{NullString: sql.NullString{
|
||||
String: s.Environment,
|
||||
Valid: true,
|
||||
}},
|
||||
}
|
||||
|
||||
if restart {
|
||||
h.signalRestart()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-logTicker.C:
|
||||
shouldLog = true
|
||||
|
|
@ -268,7 +300,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
|
|||
if takeover {
|
||||
// Insert the environment after each heartbeat takeover if it does not already exist in the database
|
||||
// as the environment may have changed, although this is likely to happen very rarely.
|
||||
if err := h.insertEnvironment(s); err != nil {
|
||||
if err := h.insertEnvironment(); err != nil {
|
||||
cancelCtx()
|
||||
return errors.Wrap(err, "can't insert environment")
|
||||
}
|
||||
|
|
@ -284,23 +316,11 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
|
|||
}
|
||||
|
||||
// insertEnvironment inserts the environment from the specified state into the database if it does not already exist.
|
||||
func (h *HA) insertEnvironment(s *icingaredisv1.IcingaStatus) error {
|
||||
e := v1.Environment{
|
||||
EntityWithoutChecksum: v1.EntityWithoutChecksum{
|
||||
IdMeta: v1.IdMeta{
|
||||
Id: s.EnvironmentID(),
|
||||
},
|
||||
},
|
||||
Name: types.String{NullString: sql.NullString{
|
||||
String: s.Environment,
|
||||
Valid: true,
|
||||
}},
|
||||
}
|
||||
|
||||
func (h *HA) insertEnvironment() error {
|
||||
// Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does.
|
||||
stmt, _ := h.db.BuildInsertIgnoreStmt(e)
|
||||
stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment)
|
||||
|
||||
if _, err := h.db.NamedExecContext(h.ctx, stmt, e); err != nil {
|
||||
if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil {
|
||||
return internal.CantPerformQuery(err, stmt)
|
||||
}
|
||||
|
||||
|
|
@ -364,3 +384,12 @@ func (h *HA) signalTakeover() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *HA) signalRestart() {
|
||||
select {
|
||||
case h.restart <- struct{}{}:
|
||||
h.responsible = false
|
||||
case <-h.ctx.Done():
|
||||
// Noop
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue