From 00fe3fe6f7ea3c892e7d983a6d25418e78694998 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 19 May 2021 19:00:55 +0200 Subject: [PATCH 1/5] retry.WithBackoff(): pass a context to the function to be tried --- pkg/config/redis.go | 4 ++-- pkg/icingadb/db.go | 6 +++--- pkg/retry/retry.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/config/redis.go b/pkg/config/redis.go index 5f3dd4dd..dd21b553 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -49,9 +49,9 @@ func dial(ctx context.Context, network, addr string) (conn net.Conn, err error) _ = retry.WithBackoff( timeoutCtx, - func() error { + func(ctx context.Context) error { prevErr := err - conn, err = dl.DialContext(timeoutCtx, network, addr) + conn, err = dl.DialContext(ctx, network, addr) if prevErr != nil && errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { err = prevErr diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 31127258..ef5f86a2 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -150,7 +150,7 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph return retry.WithBackoff( ctx, - func() error { + func(context.Context) error { query, args, err := sqlx.In(query, b) if err != nil { return err @@ -214,7 +214,7 @@ func (db *DB) NamedBulkExec( return retry.WithBackoff( ctx, - func() error { + func(ctx context.Context) error { db.logger.Debugf("Executing %s with %d rows..", query, len(b)) _, err := db.NamedExecContext(ctx, query, b) if err != nil { @@ -279,7 +279,7 @@ func (db *DB) NamedBulkExecTx( return retry.WithBackoff( ctx, - func() error { + func(ctx context.Context) error { tx, err := db.BeginTxx(ctx, nil) if err != nil { return errors.Wrap(err, "can't start transaction") diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 4f7cbd7a..04d77030 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -7,7 +7,7 @@ import ( ) // RetryableFunc is a retryable function. -type RetryableFunc func() error +type RetryableFunc func(context.Context) error // IsRetryable checks whether a new attempt can be started based on the error passed. type IsRetryable func(error) bool @@ -16,7 +16,7 @@ type IsRetryable func(error) bool // The specified backoff policy is used to determine how long to sleep between attempts. func WithBackoff(ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff) (err error) { for attempt := 0; ; /* true */ attempt++ { - if err = retryableFunc(); err == nil { + if err = retryableFunc(ctx); err == nil { // No error. return } From f77d3940414d88f25c1b345ddf18c7e0a9a03fd8 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 19 May 2021 19:12:18 +0200 Subject: [PATCH 2/5] retry.WithBackoff(): add optional timeout --- pkg/config/redis.go | 1 + pkg/icingadb/db.go | 12 +++++++++--- pkg/retry/retry.go | 11 ++++++++++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/pkg/config/redis.go b/pkg/config/redis.go index dd21b553..dbb4ba74 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -67,6 +67,7 @@ func dial(ctx context.Context, network, addr string) (conn net.Conn, err error) return false }, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + 0, ) return } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index ef5f86a2..c6db635f 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -167,7 +167,9 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph return nil }, IsRetryable, - backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second)) + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + 0, + ) } }(b)) } @@ -237,7 +239,9 @@ func (db *DB) NamedBulkExec( return nil }, IsRetryable, - backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second)) + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + 0, + ) } }(b)) case <-ctx.Done(): @@ -305,7 +309,9 @@ func (db *DB) NamedBulkExecTx( return nil }, IsRetryable, - backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second)) + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + 0, + ) } }(b)) case <-ctx.Done(): diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 04d77030..83c64a35 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -14,7 +14,16 @@ type IsRetryable func(error) bool // WithBackoff retries the passed function if it fails and the error allows it to retry. // The specified backoff policy is used to determine how long to sleep between attempts. -func WithBackoff(ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff) (err error) { +// Once the specified timeout (if >0) elapses, WithBackoff gives up. +func WithBackoff( + ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, timeout time.Duration, +) (err error) { + if timeout > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + for attempt := 0; ; /* true */ attempt++ { if err = retryableFunc(ctx); err == nil { // No error. From 5bcd5339b43b6ad866acbcfc18a940020efd35a7 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 19 May 2021 19:28:22 +0200 Subject: [PATCH 3/5] retry.WithBackoff(): return the most descriptive error --- pkg/retry/retry.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 83c64a35..e366485a 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -2,6 +2,7 @@ package retry import ( "context" + "errors" "github.com/icinga/icingadb/pkg/backoff" "time" ) @@ -25,12 +26,20 @@ func WithBackoff( } for attempt := 0; ; /* true */ attempt++ { + prevErr := err + if err = retryableFunc(ctx); err == nil { // No error. return } - if !retryable(err) { + isRetryable := retryable(err) + + if prevErr != nil && (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { + err = prevErr + } + + if !isRetryable { // Not retryable. return } From e4e138aaa4e9bf901303d69434e79ad950c0fc2b Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 20 May 2021 12:10:20 +0200 Subject: [PATCH 4/5] Redis dialer: de-duplicate retry.WithBackoff() logic --- pkg/config/redis.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/pkg/config/redis.go b/pkg/config/redis.go index dbb4ba74..cac508f0 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -44,20 +44,11 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error func dial(ctx context.Context, network, addr string) (conn net.Conn, err error) { var dl net.Dialer - timeoutCtx, cancelTimeoutCtx := context.WithTimeout(ctx, 5*time.Minute) - defer cancelTimeoutCtx() - - _ = retry.WithBackoff( - timeoutCtx, - func(ctx context.Context) error { - prevErr := err + err = retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { conn, err = dl.DialContext(ctx, network, addr) - - if prevErr != nil && errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - err = prevErr - } - - return err + return }, func(err error) bool { if op, ok := err.(*net.OpError); ok { @@ -67,7 +58,7 @@ func dial(ctx context.Context, network, addr string) (conn net.Conn, err error) return false }, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), - 0, + 5*time.Minute, ) return } From 867d5b67dd859a6ef4fd081b8ed58daadcf70881 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 20 May 2021 13:13:55 +0200 Subject: [PATCH 5/5] SQL driver: de-duplicate retry.WithBackoff() logic --- pkg/driver/driver.go | 42 +++++++++++------------------------------- 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 765ac6c7..ec8197ba 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -4,9 +4,9 @@ import ( "context" "database/sql" "database/sql/driver" - "fmt" "github.com/go-sql-driver/mysql" "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/retry" "go.uber.org/zap" "io/ioutil" "log" @@ -26,36 +26,16 @@ type Driver struct { // TODO(el): Test DNS. func (d Driver) Open(dsn string) (c driver.Conn, err error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - c, err = d.Driver.Open(dsn) - if err == nil { - // No error. Return immediately. - return - } - - fmt.Println("Got error", err) - - boff := backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1) - - for attempt, retry := 0, shouldRetry(err); retry; attempt, retry = attempt+1, shouldRetry(err) { - sleep := boff(uint64(attempt)) - d.Logger.Debugf("Sleeping for %s", sleep) - select { - case <-ctx.Done(): - // Context canceled. - return nil, ctx.Err() - case <-time.After(sleep): - // Wait for backoff duration and continue. - } - c, err = d.Driver.Open(dsn) - if err == nil { - // No error. Break retry loop. - break - } - } - + err = retry.WithBackoff( + context.Background(), + func(context.Context) (err error) { + c, err = d.Driver.Open(dsn) + return + }, + shouldRetry, + backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1), + timeout, + ) return }