mirror of
https://github.com/NLnetLabs/unbound.git
synced 2026-02-01 19:29:27 -05:00
- Redis checks for server down and throttles reconnects.
This commit is contained in:
parent
910288c0d1
commit
424f86466a
1 changed files with 99 additions and 11 deletions
110
cachedb/redis.c
110
cachedb/redis.c
|
|
@ -46,6 +46,8 @@
|
|||
#include "cachedb/cachedb.h"
|
||||
#include "util/alloc.h"
|
||||
#include "util/config_file.h"
|
||||
#include "util/locks.h"
|
||||
#include "util/timeval_func.h"
|
||||
#include "sldns/sbuffer.h"
|
||||
|
||||
#ifdef USE_REDIS
|
||||
|
|
@ -75,6 +77,18 @@ struct redis_moddata {
|
|||
/* timeout for connection setup */
|
||||
struct timeval connect_timeout;
|
||||
struct timeval replica_connect_timeout;
|
||||
/* the reconnect interval time. */
|
||||
struct timeval reconnect_interval;
|
||||
struct timeval replica_reconnect_interval;
|
||||
/* reconnect attempts, 0 if connected, counts up failed reconnects. */
|
||||
int reconnect_attempts;
|
||||
int replica_reconnect_attempts;
|
||||
/* Lock on reconnect_wait time. */
|
||||
lock_basic_type wait_lock;
|
||||
lock_basic_type replica_wait_lock;
|
||||
/* reconnect wait time, wait until it has passed before reconnect. */
|
||||
struct timeval reconnect_wait;
|
||||
struct timeval replica_reconnect_wait;
|
||||
/* the redis logical database to use */
|
||||
int logical_db;
|
||||
int replica_logical_db;
|
||||
|
|
@ -82,6 +96,10 @@ struct redis_moddata {
|
|||
int set_with_ex_available;
|
||||
};
|
||||
|
||||
/** The limit on the number of redis connect attempts. After failure if
|
||||
* the number is exceeded, the reconnects are throttled by the wait time. */
|
||||
#define REDIS_RECONNECT_ATTEMPT_LIMIT 3
|
||||
|
||||
static redisReply* redis_command(struct module_env*, struct cachedb_env*,
|
||||
const char*, const uint8_t*, size_t, int);
|
||||
|
||||
|
|
@ -105,6 +123,8 @@ moddata_clean(struct redis_moddata** moddata) {
|
|||
}
|
||||
free((*moddata)->replica_ctxs);
|
||||
}
|
||||
lock_basic_destroy(&(*moddata)->wait_lock);
|
||||
lock_basic_destroy(&(*moddata)->replica_wait_lock);
|
||||
free(*moddata);
|
||||
*moddata = NULL;
|
||||
}
|
||||
|
|
@ -113,10 +133,31 @@ static redisContext*
|
|||
redis_connect(const char* host, int port, const char* path,
|
||||
const char* password, int logical_db,
|
||||
const struct timeval connect_timeout,
|
||||
const struct timeval command_timeout)
|
||||
const struct timeval command_timeout,
|
||||
const struct timeval* reconnect_interval,
|
||||
int* reconnect_attempts,
|
||||
struct timeval* reconnect_wait,
|
||||
lock_basic_type* wait_lock,
|
||||
struct timeval* now_tv,
|
||||
const char* infostr)
|
||||
{
|
||||
redisContext* ctx;
|
||||
|
||||
/* See if the redis server is down, and reconnect has to wait. */
|
||||
if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) {
|
||||
/* Acquire lock to look at timeval, the integer has atomic
|
||||
* integrity. */
|
||||
struct timeval wait_tv;
|
||||
lock_basic_lock(wait_lock);
|
||||
wait_tv = *reconnect_wait;
|
||||
lock_basic_unlock(wait_lock);
|
||||
if(timeval_smaller(now_tv, &wait_tv)) {
|
||||
verbose(VERB_ALGO, "redis %sdown, reconnect wait",
|
||||
infostr);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if(path && path[0]!=0) {
|
||||
ctx = redisConnectUnixWithTimeout(path, connect_timeout);
|
||||
} else {
|
||||
|
|
@ -126,18 +167,18 @@ redis_connect(const char* host, int port, const char* path,
|
|||
const char *errstr = "out of memory";
|
||||
if(ctx)
|
||||
errstr = ctx->errstr;
|
||||
log_err("failed to connect to redis server: %s", errstr);
|
||||
log_err("failed to connect to redis %sserver: %s", infostr, errstr);
|
||||
goto fail;
|
||||
}
|
||||
if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) {
|
||||
log_err("failed to set redis timeout, %s", ctx->errstr);
|
||||
log_err("failed to set redis %stimeout, %s", infostr, ctx->errstr);
|
||||
goto fail;
|
||||
}
|
||||
if(password && password[0]!=0) {
|
||||
redisReply* rep;
|
||||
rep = redisCommand(ctx, "AUTH %s", password);
|
||||
if(!rep || rep->type == REDIS_REPLY_ERROR) {
|
||||
log_err("failed to authenticate with password");
|
||||
log_err("failed to authenticate %swith password", infostr);
|
||||
freeReplyObject(rep);
|
||||
goto fail;
|
||||
}
|
||||
|
|
@ -147,18 +188,20 @@ redis_connect(const char* host, int port, const char* path,
|
|||
redisReply* rep;
|
||||
rep = redisCommand(ctx, "SELECT %d", logical_db);
|
||||
if(!rep || rep->type == REDIS_REPLY_ERROR) {
|
||||
log_err("failed to set logical database (%d)",
|
||||
logical_db);
|
||||
log_err("failed %sto set logical database (%d)",
|
||||
infostr, logical_db);
|
||||
freeReplyObject(rep);
|
||||
goto fail;
|
||||
}
|
||||
freeReplyObject(rep);
|
||||
}
|
||||
*reconnect_attempts = 0;
|
||||
if(verbosity >= VERB_OPS) {
|
||||
char port_str[6+1];
|
||||
port_str[0] = ' ';
|
||||
(void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port);
|
||||
verbose(VERB_OPS, "Connection to Redis established (%s%s)",
|
||||
verbose(VERB_OPS, "Connection to Redis %sestablished (%s%s)",
|
||||
infostr,
|
||||
path&&path[0]!=0?path:host,
|
||||
path&&path[0]!=0?"":port_str);
|
||||
}
|
||||
|
|
@ -167,6 +210,18 @@ redis_connect(const char* host, int port, const char* path,
|
|||
fail:
|
||||
if(ctx)
|
||||
redisFree(ctx);
|
||||
(*reconnect_attempts)++;
|
||||
if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) {
|
||||
/* Wait for the reconnect interval before trying again. */
|
||||
struct timeval tv;
|
||||
tv = *now_tv;
|
||||
timeval_add(&tv, reconnect_interval);
|
||||
lock_basic_lock(wait_lock);
|
||||
*reconnect_wait = tv;
|
||||
lock_basic_unlock(wait_lock);
|
||||
verbose(VERB_ALGO, "redis %sreconnect wait until %d.%6.6d",
|
||||
infostr, (int)tv.tv_sec, (int)tv.tv_usec);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
@ -191,6 +246,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
|
|||
log_err("out of memory");
|
||||
goto fail;
|
||||
}
|
||||
lock_basic_init(&moddata->wait_lock);
|
||||
lock_protect(&moddata->wait_lock, &moddata->reconnect_wait,
|
||||
sizeof(moddata->reconnect_wait));
|
||||
lock_basic_init(&moddata->replica_wait_lock);
|
||||
lock_protect(&moddata->replica_wait_lock,
|
||||
&moddata->replica_reconnect_wait,
|
||||
sizeof(moddata->replica_reconnect_wait));
|
||||
moddata->numctxs = env->cfg->num_threads;
|
||||
/* note: server_host and similar string configuration options are
|
||||
* shallow references to configured strings; we don't have to free them
|
||||
|
|
@ -219,6 +281,8 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
|
|||
set_timeout(&moddata->replica_connect_timeout,
|
||||
env->cfg->redis_replica_timeout,
|
||||
env->cfg->redis_replica_connect_timeout);
|
||||
set_timeout(&moddata->reconnect_interval, 1000, 0);
|
||||
set_timeout(&moddata->replica_reconnect_interval, 1000, 0);
|
||||
|
||||
moddata->logical_db = env->cfg->redis_logical_db;
|
||||
moddata->replica_logical_db = env->cfg->redis_replica_logical_db;
|
||||
|
|
@ -245,7 +309,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
|
|||
moddata->server_password,
|
||||
moddata->logical_db,
|
||||
moddata->connect_timeout,
|
||||
moddata->command_timeout);
|
||||
moddata->command_timeout,
|
||||
&moddata->reconnect_interval,
|
||||
&moddata->reconnect_attempts,
|
||||
&moddata->reconnect_wait,
|
||||
&moddata->wait_lock,
|
||||
env->now_tv,
|
||||
"");
|
||||
if(!ctx) {
|
||||
log_err("redis_init: failed to init redis "
|
||||
"(for thread %d)", i);
|
||||
|
|
@ -263,7 +333,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
|
|||
moddata->replica_server_password,
|
||||
moddata->replica_logical_db,
|
||||
moddata->replica_connect_timeout,
|
||||
moddata->replica_command_timeout);
|
||||
moddata->replica_command_timeout,
|
||||
&moddata->replica_reconnect_interval,
|
||||
&moddata->replica_reconnect_attempts,
|
||||
&moddata->replica_reconnect_wait,
|
||||
&moddata->replica_wait_lock,
|
||||
env->now_tv,
|
||||
"replica ");
|
||||
if(!ctx) {
|
||||
log_err("redis_init: failed to init redis "
|
||||
"replica (for thread %d)", i);
|
||||
|
|
@ -364,7 +440,13 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
|
|||
d->replica_server_password,
|
||||
d->replica_logical_db,
|
||||
d->replica_connect_timeout,
|
||||
d->replica_command_timeout);
|
||||
d->replica_command_timeout,
|
||||
&d->replica_reconnect_interval,
|
||||
&d->replica_reconnect_attempts,
|
||||
&d->replica_reconnect_wait,
|
||||
&d->replica_wait_lock,
|
||||
env->now_tv,
|
||||
"replica ");
|
||||
} else {
|
||||
ctx = redis_connect(
|
||||
d->server_host,
|
||||
|
|
@ -373,7 +455,13 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
|
|||
d->server_password,
|
||||
d->logical_db,
|
||||
d->connect_timeout,
|
||||
d->command_timeout);
|
||||
d->command_timeout,
|
||||
&d->reconnect_interval,
|
||||
&d->reconnect_attempts,
|
||||
&d->reconnect_wait,
|
||||
&d->wait_lock,
|
||||
env->now_tv,
|
||||
"");
|
||||
}
|
||||
ctx_selector[env->alloc->thread_num] = ctx;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue