Telemetry: use mutex for synchronizing last database error

The old CompareAndSwap based code tended to end up in an endless loop. Replace
it by simple syncrhonization mechanisms where this can't happen.
This commit is contained in:
Julian Brost 2022-06-28 09:54:50 +02:00 committed by Julian Brost
parent def7c5f22c
commit 061660b023
2 changed files with 49 additions and 34 deletions

View file

@ -40,14 +40,14 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
retry.Settings{
Timeout: timeout,
OnError: func(_ time.Duration, _ uint64, err, lastErr error) {
updateCurrentDbConnErr(err)
telemetry.UpdateCurrentDbConnErr(err)
if lastErr == nil || err.Error() != lastErr.Error() {
c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err))
}
},
OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
updateCurrentDbConnErr(nil)
telemetry.UpdateCurrentDbConnErr(nil)
if attempt > 0 {
c.driver.Logger.Infow("Reconnected to database",
@ -59,30 +59,6 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
return conn, err
}
// updateCurrentDbConnErr updates telemetry.CurrentDbConnErr if necessary.
func updateCurrentDbConnErr(err error) {
ours := telemetry.DbConnErr{SinceMilli: time.Now().UnixMilli()}
if err != nil {
ours.Message = err.Error()
}
for {
theirs, _ := telemetry.CurrentDbConnErr.Load()
if theirs.SinceMilli >= ours.SinceMilli || theirs.Message == ours.Message {
break
}
merge := ours
if theirs.Message != "" && ours.Message != "" {
merge.SinceMilli = theirs.SinceMilli
}
if telemetry.CurrentDbConnErr.CompareAndSwap(theirs, merge) {
break
}
}
}
// Driver implements part of the driver.Connector interface.
func (c RetryConnector) Driver() driver.Driver {
return c.driver

View file

@ -16,6 +16,7 @@ import (
"runtime/metrics"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
@ -30,13 +31,51 @@ type SuccessfulSync struct {
DurationMilli int64
}
type DbConnErr struct {
Message string
SinceMilli int64
// currentDbConnErr stores ongoing errors from database connections.
var currentDbConnErr struct {
mu sync.Mutex
message string
sinceMilli int64
}
// CurrentDbConnErr is to be updated by the DB connector.
var CurrentDbConnErr com.Atomic[DbConnErr]
// UpdateCurrentDbConnErr updates the current error information stored in currentDbConnErr.
func UpdateCurrentDbConnErr(err error) {
now := time.Now().UnixMilli()
currentDbConnErr.mu.Lock()
defer currentDbConnErr.mu.Unlock()
if currentDbConnErr.sinceMilli >= now {
// Already updated with a more recent error, ignore this one.
return
}
message := ""
if err != nil {
message = err.Error()
}
if currentDbConnErr.message == message {
// Error stayed the same, no update needed, keeping the old timestamp.
return
}
if currentDbConnErr.message == "" || message == "" {
// Either first error or recovery from an error, update timestamp.
currentDbConnErr.sinceMilli = now
}
currentDbConnErr.message = message
}
// GetCurrentDbConnErr returns the last error message (or the empty string if not in an error state) and a timestamp in
// milliseconds of the last change from OK to error or from error to OK.
func GetCurrentDbConnErr() (string, int64) {
currentDbConnErr.mu.Lock()
defer currentDbConnErr.mu.Unlock()
return currentDbConnErr.message, currentDbConnErr.sinceMilli
}
// OngoingSyncStartMilli is to be updated by the main() function.
var OngoingSyncStartMilli int64
@ -63,15 +102,15 @@ func StartHeartbeat(
responsibleTsMilli, responsible, otherResponsible := ha.State()
ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli)
sync, _ := LastSuccessfulSync.Load()
dbConnErr, _ := CurrentDbConnErr.Load()
dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr()
now := time.Now()
values := map[string]string{
"version": internal.Version.Version,
"time": strconv.FormatInt(now.UnixMilli(), 10),
"start-time": strconv.FormatInt(startTime, 10),
"error": dbConnErr.Message,
"error-since": strconv.FormatInt(dbConnErr.SinceMilli, 10),
"error": dbConnErr,
"error-since": strconv.FormatInt(dbConnErrSinceMilli, 10),
"performance-data": goMetrics.PerformanceData(),
"last-heartbeat-received": strconv.FormatInt(heartbeat, 10),
"ha-responsible": boolToStr[responsible],