mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Merge pull request #64 from lippserd/feature/mysql-retry-5m
Re-use Redis dialer logic in retry.WithBackoff() and the SQL driver
This commit is contained in:
commit
c5e875cfce
4 changed files with 51 additions and 55 deletions
|
|
@ -44,20 +44,11 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error
|
|||
func dial(ctx context.Context, network, addr string) (conn net.Conn, err error) {
|
||||
var dl net.Dialer
|
||||
|
||||
timeoutCtx, cancelTimeoutCtx := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancelTimeoutCtx()
|
||||
|
||||
_ = retry.WithBackoff(
|
||||
timeoutCtx,
|
||||
func() error {
|
||||
prevErr := err
|
||||
conn, err = dl.DialContext(timeoutCtx, network, addr)
|
||||
|
||||
if prevErr != nil && errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||
err = prevErr
|
||||
}
|
||||
|
||||
return err
|
||||
err = retry.WithBackoff(
|
||||
ctx,
|
||||
func(ctx context.Context) (err error) {
|
||||
conn, err = dl.DialContext(ctx, network, addr)
|
||||
return
|
||||
},
|
||||
func(err error) bool {
|
||||
if op, ok := err.(*net.OpError); ok {
|
||||
|
|
@ -67,6 +58,7 @@ func dial(ctx context.Context, network, addr string) (conn net.Conn, err error)
|
|||
return false
|
||||
},
|
||||
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
|
||||
5*time.Minute,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,9 +4,9 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"fmt"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/icinga/icingadb/pkg/backoff"
|
||||
"github.com/icinga/icingadb/pkg/retry"
|
||||
"go.uber.org/zap"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
|
|
@ -26,36 +26,16 @@ type Driver struct {
|
|||
|
||||
// TODO(el): Test DNS.
|
||||
func (d Driver) Open(dsn string) (c driver.Conn, err error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
c, err = d.Driver.Open(dsn)
|
||||
if err == nil {
|
||||
// No error. Return immediately.
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("Got error", err)
|
||||
|
||||
boff := backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1)
|
||||
|
||||
for attempt, retry := 0, shouldRetry(err); retry; attempt, retry = attempt+1, shouldRetry(err) {
|
||||
sleep := boff(uint64(attempt))
|
||||
d.Logger.Debugf("Sleeping for %s", sleep)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Context canceled.
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(sleep):
|
||||
// Wait for backoff duration and continue.
|
||||
}
|
||||
c, err = d.Driver.Open(dsn)
|
||||
if err == nil {
|
||||
// No error. Break retry loop.
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
err = retry.WithBackoff(
|
||||
context.Background(),
|
||||
func(context.Context) (err error) {
|
||||
c, err = d.Driver.Open(dsn)
|
||||
return
|
||||
},
|
||||
shouldRetry,
|
||||
backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1),
|
||||
timeout,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -150,7 +150,7 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
|
|||
|
||||
return retry.WithBackoff(
|
||||
ctx,
|
||||
func() error {
|
||||
func(context.Context) error {
|
||||
query, args, err := sqlx.In(query, b)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -167,7 +167,9 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
|
|||
return nil
|
||||
},
|
||||
IsRetryable,
|
||||
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second))
|
||||
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
|
||||
0,
|
||||
)
|
||||
}
|
||||
}(b))
|
||||
}
|
||||
|
|
@ -214,7 +216,7 @@ func (db *DB) NamedBulkExec(
|
|||
|
||||
return retry.WithBackoff(
|
||||
ctx,
|
||||
func() error {
|
||||
func(ctx context.Context) error {
|
||||
db.logger.Debugf("Executing %s with %d rows..", query, len(b))
|
||||
_, err := db.NamedExecContext(ctx, query, b)
|
||||
if err != nil {
|
||||
|
|
@ -237,7 +239,9 @@ func (db *DB) NamedBulkExec(
|
|||
return nil
|
||||
},
|
||||
IsRetryable,
|
||||
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second))
|
||||
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
|
||||
0,
|
||||
)
|
||||
}
|
||||
}(b))
|
||||
case <-ctx.Done():
|
||||
|
|
@ -279,7 +283,7 @@ func (db *DB) NamedBulkExecTx(
|
|||
|
||||
return retry.WithBackoff(
|
||||
ctx,
|
||||
func() error {
|
||||
func(ctx context.Context) error {
|
||||
tx, err := db.BeginTxx(ctx, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't start transaction")
|
||||
|
|
@ -305,7 +309,9 @@ func (db *DB) NamedBulkExecTx(
|
|||
return nil
|
||||
},
|
||||
IsRetryable,
|
||||
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second))
|
||||
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
|
||||
0,
|
||||
)
|
||||
}
|
||||
}(b))
|
||||
case <-ctx.Done():
|
||||
|
|
|
|||
|
|
@ -2,26 +2,44 @@ package retry
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/icinga/icingadb/pkg/backoff"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RetryableFunc is a retryable function.
|
||||
type RetryableFunc func() error
|
||||
type RetryableFunc func(context.Context) error
|
||||
|
||||
// IsRetryable checks whether a new attempt can be started based on the error passed.
|
||||
type IsRetryable func(error) bool
|
||||
|
||||
// WithBackoff retries the passed function if it fails and the error allows it to retry.
|
||||
// The specified backoff policy is used to determine how long to sleep between attempts.
|
||||
func WithBackoff(ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff) (err error) {
|
||||
// Once the specified timeout (if >0) elapses, WithBackoff gives up.
|
||||
func WithBackoff(
|
||||
ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, timeout time.Duration,
|
||||
) (err error) {
|
||||
if timeout > 0 {
|
||||
var cancel func()
|
||||
ctx, cancel = context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
for attempt := 0; ; /* true */ attempt++ {
|
||||
if err = retryableFunc(); err == nil {
|
||||
prevErr := err
|
||||
|
||||
if err = retryableFunc(ctx); err == nil {
|
||||
// No error.
|
||||
return
|
||||
}
|
||||
|
||||
if !retryable(err) {
|
||||
isRetryable := retryable(err)
|
||||
|
||||
if prevErr != nil && (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) {
|
||||
err = prevErr
|
||||
}
|
||||
|
||||
if !isRetryable {
|
||||
// Not retryable.
|
||||
return
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue