From f24ad5d83140eee04bd1cda8ee19bced0de26c11 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Wed, 17 Oct 2018 21:04:35 +0200 Subject: [PATCH 01/31] cli: pass auth through REDISCLI_AUTH This adds support for passing a password through a REDISCLI_AUTH environment variable (which is safer than the CLI), which might often be safer than passing it through a CLI argument. Passing a password this way does not trigger the warning about passing a password through CLI arguments, and CLI arguments take precedence over it. --- src/redis-cli.c | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/redis-cli.c b/src/redis-cli.c index 57f812b90..75010c5a8 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -67,6 +67,7 @@ #define REDIS_CLI_HISTFILE_DEFAULT ".rediscli_history" #define REDIS_CLI_RCFILE_ENV "REDISCLI_RCFILE" #define REDIS_CLI_RCFILE_DEFAULT ".redisclirc" +#define REDIS_CLI_AUTH_ENV "REDISCLI_AUTH" #define CLUSTER_MANAGER_SLOTS 16384 #define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000 @@ -1419,6 +1420,14 @@ static int parseOptions(int argc, char **argv) { return i; } +static void parseEnv() { + /* Set auth from env, but do not overwrite CLI arguments if passed */ + char *auth = getenv(REDIS_CLI_AUTH_ENV); + if (auth != NULL && config.auth == NULL) { + config.auth = auth; + } +} + static sds readArgFromStdin(void) { char buf[1024]; sds arg = sdsempty(); @@ -1446,6 +1455,9 @@ static void usage(void) { " -p Server port (default: 6379).\n" " -s Server socket (overrides hostname and port).\n" " -a Password to use when connecting to the server.\n" +" You can also use the " REDIS_CLI_AUTH_ENV " environment\n" +" variable to pass this password more safely\n" +" (if both are used, this argument takes predecence).\n" " -u Server URI.\n" " -r Execute specified command N times.\n" " -i When -r is used, waits seconds per command.\n" @@ -6657,6 +6669,8 @@ int main(int argc, char **argv) { argc -= firstarg; argv += firstarg; + parseEnv(); + /* Cluster Manager mode */ if (CLUSTER_MANAGER_MODE()) { clusterManagerCommandProc *proc = validateClusterManagerCommand(); From 0da171b3a675ee0897e7d566a6dcefd02fb671c4 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 30 Nov 2018 12:05:46 +0800 Subject: [PATCH 02/31] remove useless tryObjectEncoding in debug assert --- src/debug.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/debug.c b/src/debug.c index 3cb567520..d9d53eab0 100644 --- a/src/debug.c +++ b/src/debug.c @@ -336,7 +336,6 @@ NULL zfree(ptr); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"assert")) { - if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]); serverAssertWithInfo(c,c->argv[0],1 == 2); } else if (!strcasecmp(c->argv[1]->ptr,"log") && c->argc == 3) { serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)c->argv[2]->ptr); From b587c54c24d0cfdc3cfb22303553ea5e780b87c1 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 2 Dec 2018 15:29:20 +0200 Subject: [PATCH 03/31] fix #5580, display fragmentation and rss overhead bytes as signed these metrics become negative when RSS is smaller than the used_memory. This can easily happen when the program allocated a lot of memory and haven't written to it yet, in which case the kernel doesn't allocate any pages to the process --- src/server.c | 6 +++--- src/server.h | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/server.c b/src/server.c index cc335ebdc..bac5d400f 100644 --- a/src/server.c +++ b/src/server.c @@ -3247,11 +3247,11 @@ sds genRedisInfoString(char *section) { "allocator_frag_ratio:%.2f\r\n" "allocator_frag_bytes:%zu\r\n" "allocator_rss_ratio:%.2f\r\n" - "allocator_rss_bytes:%zu\r\n" + "allocator_rss_bytes:%zd\r\n" "rss_overhead_ratio:%.2f\r\n" - "rss_overhead_bytes:%zu\r\n" + "rss_overhead_bytes:%zd\r\n" "mem_fragmentation_ratio:%.2f\r\n" - "mem_fragmentation_bytes:%zu\r\n" + "mem_fragmentation_bytes:%zd\r\n" "mem_not_counted_for_evict:%zu\r\n" "mem_replication_backlog:%zu\r\n" "mem_clients_slaves:%zu\r\n" diff --git a/src/server.h b/src/server.h index 86d6e2e4b..f7c348622 100644 --- a/src/server.h +++ b/src/server.h @@ -864,11 +864,11 @@ struct redisMemOverhead { float dataset_perc; float peak_perc; float total_frag; - size_t total_frag_bytes; + ssize_t total_frag_bytes; float allocator_frag; - size_t allocator_frag_bytes; + ssize_t allocator_frag_bytes; float allocator_rss; - size_t allocator_rss_bytes; + ssize_t allocator_rss_bytes; float rss_extra; size_t rss_extra_bytes; size_t num_dbs; From e2c1f80b464a3a6dde961bb30bff9a39c17c6b29 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Tue, 4 Dec 2018 07:17:17 +0000 Subject: [PATCH 04/31] Fixed a serverPanic when sending an invalid command to a monitor client --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 7d387dabc..02d2a17ba 100644 --- a/src/networking.c +++ b/src/networking.c @@ -365,7 +365,7 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { * Where the master must propagate the first change even if the second * will produce an error. However it is useful to log such events since * they are rare and may hint at errors in a script or a bug in Redis. */ - if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) { + if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { char* to = c->flags & CLIENT_MASTER? "master": "replica"; char* from = c->flags & CLIENT_MASTER? "replica": "master"; char *cmdname = c->lastcmd ? c->lastcmd->name : ""; From dfe81b33fe610ccdc205f5b5e49f94f8aa8fbefc Mon Sep 17 00:00:00 2001 From: yongman Date: Tue, 4 Dec 2018 17:36:42 +0800 Subject: [PATCH 05/31] Fix cluster call reply format readable --- src/redis-cli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index f307d31cf..c85358144 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -5404,7 +5404,7 @@ static int clusterManagerCommandCall(int argc, char **argv) { if (status != REDIS_OK || reply == NULL ) printf("%s:%d: Failed!\n", n->ip, n->port); else { - sds formatted_reply = cliFormatReplyTTY(reply, ""); + sds formatted_reply = cliFormatReplyRaw(reply); printf("%s:%d: %s\n", n->ip, n->port, (char *) formatted_reply); sdsfree(formatted_reply); } From b1e35d324404a5017cd925f701211692ed62d495 Mon Sep 17 00:00:00 2001 From: hdmg <438954+louiszhw@users.noreply.github.com> Date: Wed, 5 Dec 2018 17:15:02 +0800 Subject: [PATCH 06/31] fix comments fault discription --- src/sds.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sds.c b/src/sds.c index 39ad595ed..330c955e8 100644 --- a/src/sds.c +++ b/src/sds.c @@ -695,7 +695,7 @@ sds sdscatfmt(sds s, char const *fmt, ...) { * s = sdstrim(s,"Aa. :"); * printf("%s\n", s); * - * Output will be just "Hello World". + * Output will be just "HelloWorld". */ sds sdstrim(sds s, const char *cset) { char *start, *end, *sp, *ep; From 6aa9606995bc8ae55f01f7713bc9ed8dc88070f1 Mon Sep 17 00:00:00 2001 From: lsytj0413 <511121939@qq.com> Date: Tue, 29 Aug 2017 14:44:05 +0800 Subject: [PATCH 07/31] fix a typo: craeted -> created --- src/db.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index d7233869f..62c8aa131 100644 --- a/src/db.c +++ b/src/db.c @@ -212,7 +212,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { * 2) clients WATCHing for the destination key notified. * 3) The expire time of the key is reset (the key is made persistent). * - * All the new keys in the database should be creted via this interface. */ + * All the new keys in the database should be created via this interface. */ void setKey(redisDb *db, robj *key, robj *val) { if (lookupKeyWrite(db,key) == NULL) { dbAdd(db,key,val); From 74f942cfa5386b501c0b23733d97ee9a69ad8888 Mon Sep 17 00:00:00 2001 From: yura Date: Wed, 22 Aug 2018 17:56:13 +0300 Subject: [PATCH 08/31] redis-cli reshard/rebalance: ability to force replacement on existing keys --- src/redis-cli.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 0ae96f564..7e558a306 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2826,8 +2826,8 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, char **err) { int success = 1; - int do_fix = (config.cluster_manager_command.flags & - CLUSTER_MANAGER_CMD_FLAG_FIX); + int replace_existing_keys = (config.cluster_manager_command.flags & + (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); while (1) { char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; @@ -2858,13 +2858,14 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, dots); if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { - if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) { + int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; + int not_served = strstr(migrate_reply->str, "slot not served") != NULL; + if (replace_existing_keys && (is_busy || not_served)) { /* If the key already exists, try to migrate keys * adding REPLACE option. * If the key's slot is not served, try to assign slot * to the target node. */ - int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL); - if (strstr(migrate_reply->str, "slot not served") != NULL) + if (not_served) clusterManagerSetSlot(source, target, slot, "node", NULL); clusterManagerLogWarn("*** Target key exists. " "Replacing it for FIX.\n"); From 28c42814956a40f6c7b432a9692e1f1d7437925c Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 7 Dec 2018 19:14:33 +0800 Subject: [PATCH 09/31] networking: current_client should not be NULL when trim qb_pos --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 02d2a17ba..f19eb86bb 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1470,7 +1470,7 @@ void processInputBuffer(client *c) { } /* Trim to pos */ - if (c->qb_pos) { + if (server.current_client != NULL && c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } From e9400e8efdad01c93b80624805b7ddb2d7ec99ec Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Dec 2018 16:30:33 +0100 Subject: [PATCH 10/31] DEBUG DIGEST refactoring: extract function to digest a value. --- src/debug.c | 273 +++++++++++++++++++++++++++------------------------- 1 file changed, 142 insertions(+), 131 deletions(-) diff --git a/src/debug.c b/src/debug.c index 3cb567520..3c2e7fd55 100644 --- a/src/debug.c +++ b/src/debug.c @@ -74,7 +74,7 @@ void xorDigest(unsigned char *digest, void *ptr, size_t len) { digest[j] ^= hash[j]; } -void xorObjectDigest(unsigned char *digest, robj *o) { +void xorStringObjectDigest(unsigned char *digest, robj *o) { o = getDecodedObject(o); xorDigest(digest,o->ptr,sdslen(o->ptr)); decrRefCount(o); @@ -104,12 +104,151 @@ void mixDigest(unsigned char *digest, void *ptr, size_t len) { SHA1Final(digest,&ctx); } -void mixObjectDigest(unsigned char *digest, robj *o) { +void mixStringObjectDigest(unsigned char *digest, robj *o) { o = getDecodedObject(o); mixDigest(digest,o->ptr,sdslen(o->ptr)); decrRefCount(o); } +/* This function computes the digest of a data structure stored in the + * object 'o'. It is the core of the DEBUG DIGEST command: when taking the + * digest of a whole dataset, we take the digest of the key and the value + * pair, and xor all those together. + * + * Note that this function does not reset the initial 'digest' passed, it + * will continue mixing this object digest to anything that was already + * present. */ +void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) { + uint32_t aux = htonl(o->type); + mixDigest(digest,&aux,sizeof(aux)); + long long expiretime = getExpire(db,keyobj); + char buf[128]; + + /* Save the key and associated value */ + if (o->type == OBJ_STRING) { + mixStringObjectDigest(digest,o); + } else if (o->type == OBJ_LIST) { + listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + robj *eleobj = listTypeGet(&entry); + mixStringObjectDigest(digest,eleobj); + decrRefCount(eleobj); + } + listTypeReleaseIterator(li); + } else if (o->type == OBJ_SET) { + setTypeIterator *si = setTypeInitIterator(o); + sds sdsele; + while((sdsele = setTypeNextObject(si)) != NULL) { + xorDigest(digest,sdsele,sdslen(sdsele)); + sdsfree(sdsele); + } + setTypeReleaseIterator(si); + } else if (o->type == OBJ_ZSET) { + unsigned char eledigest[20]; + + if (o->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *zl = o->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vll; + double score; + + eptr = ziplistIndex(zl,0); + serverAssert(eptr != NULL); + sptr = ziplistNext(zl,eptr); + serverAssert(sptr != NULL); + + while (eptr != NULL) { + serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll)); + score = zzlGetScore(sptr); + + memset(eledigest,0,20); + if (vstr != NULL) { + mixDigest(eledigest,vstr,vlen); + } else { + ll2string(buf,sizeof(buf),vll); + mixDigest(eledigest,buf,strlen(buf)); + } + + snprintf(buf,sizeof(buf),"%.17g",score); + mixDigest(eledigest,buf,strlen(buf)); + xorDigest(digest,eledigest,20); + zzlNext(zl,&eptr,&sptr); + } + } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = o->ptr; + dictIterator *di = dictGetIterator(zs->dict); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + sds sdsele = dictGetKey(de); + double *score = dictGetVal(de); + + snprintf(buf,sizeof(buf),"%.17g",*score); + memset(eledigest,0,20); + mixDigest(eledigest,sdsele,sdslen(sdsele)); + mixDigest(eledigest,buf,strlen(buf)); + xorDigest(digest,eledigest,20); + } + dictReleaseIterator(di); + } else { + serverPanic("Unknown sorted set encoding"); + } + } else if (o->type == OBJ_HASH) { + hashTypeIterator *hi = hashTypeInitIterator(o); + while (hashTypeNext(hi) != C_ERR) { + unsigned char eledigest[20]; + sds sdsele; + + memset(eledigest,0,20); + sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY); + mixDigest(eledigest,sdsele,sdslen(sdsele)); + sdsfree(sdsele); + sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE); + mixDigest(eledigest,sdsele,sdslen(sdsele)); + sdsfree(sdsele); + xorDigest(digest,eledigest,20); + } + hashTypeReleaseIterator(hi); + } else if (o->type == OBJ_STREAM) { + streamIterator si; + streamIteratorStart(&si,o->ptr,NULL,NULL,0); + streamID id; + int64_t numfields; + + while(streamIteratorGetID(&si,&id,&numfields)) { + sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq); + mixDigest(digest,itemid,sdslen(itemid)); + sdsfree(itemid); + + while(numfields--) { + unsigned char *field, *value; + int64_t field_len, value_len; + streamIteratorGetField(&si,&field,&value, + &field_len,&value_len); + mixDigest(digest,field,field_len); + mixDigest(digest,value,value_len); + } + } + streamIteratorStop(&si); + } else if (o->type == OBJ_MODULE) { + RedisModuleDigest md; + moduleValue *mv = o->ptr; + moduleType *mt = mv->type; + moduleInitDigestContext(md); + if (mt->digest) { + mt->digest(&md,mv->value); + xorDigest(digest,md.x,sizeof(md.x)); + } + } else { + serverPanic("Unknown object type"); + } + /* If the key has an expire, add it to the mix */ + if (expiretime != -1) xorDigest(digest,"!!expire!!",10); +} + /* Compute the dataset digest. Since keys, sets elements, hashes elements * are not ordered, we use a trick: every aggregate digest is the xor * of the digests of their elements. This way the order will not change @@ -118,7 +257,6 @@ void mixObjectDigest(unsigned char *digest, robj *o) { * a different digest. */ void computeDatasetDigest(unsigned char *final) { unsigned char digest[20]; - char buf[128]; dictIterator *di = NULL; dictEntry *de; int j; @@ -141,7 +279,6 @@ void computeDatasetDigest(unsigned char *final) { while((de = dictNext(di)) != NULL) { sds key; robj *keyobj, *o; - long long expiretime; memset(digest,0,20); /* This key-val digest */ key = dictGetKey(de); @@ -150,134 +287,8 @@ void computeDatasetDigest(unsigned char *final) { mixDigest(digest,key,sdslen(key)); o = dictGetVal(de); + xorObjectDigest(db,keyobj,digest,o); - aux = htonl(o->type); - mixDigest(digest,&aux,sizeof(aux)); - expiretime = getExpire(db,keyobj); - - /* Save the key and associated value */ - if (o->type == OBJ_STRING) { - mixObjectDigest(digest,o); - } else if (o->type == OBJ_LIST) { - listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL); - listTypeEntry entry; - while(listTypeNext(li,&entry)) { - robj *eleobj = listTypeGet(&entry); - mixObjectDigest(digest,eleobj); - decrRefCount(eleobj); - } - listTypeReleaseIterator(li); - } else if (o->type == OBJ_SET) { - setTypeIterator *si = setTypeInitIterator(o); - sds sdsele; - while((sdsele = setTypeNextObject(si)) != NULL) { - xorDigest(digest,sdsele,sdslen(sdsele)); - sdsfree(sdsele); - } - setTypeReleaseIterator(si); - } else if (o->type == OBJ_ZSET) { - unsigned char eledigest[20]; - - if (o->encoding == OBJ_ENCODING_ZIPLIST) { - unsigned char *zl = o->ptr; - unsigned char *eptr, *sptr; - unsigned char *vstr; - unsigned int vlen; - long long vll; - double score; - - eptr = ziplistIndex(zl,0); - serverAssert(eptr != NULL); - sptr = ziplistNext(zl,eptr); - serverAssert(sptr != NULL); - - while (eptr != NULL) { - serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll)); - score = zzlGetScore(sptr); - - memset(eledigest,0,20); - if (vstr != NULL) { - mixDigest(eledigest,vstr,vlen); - } else { - ll2string(buf,sizeof(buf),vll); - mixDigest(eledigest,buf,strlen(buf)); - } - - snprintf(buf,sizeof(buf),"%.17g",score); - mixDigest(eledigest,buf,strlen(buf)); - xorDigest(digest,eledigest,20); - zzlNext(zl,&eptr,&sptr); - } - } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = o->ptr; - dictIterator *di = dictGetIterator(zs->dict); - dictEntry *de; - - while((de = dictNext(di)) != NULL) { - sds sdsele = dictGetKey(de); - double *score = dictGetVal(de); - - snprintf(buf,sizeof(buf),"%.17g",*score); - memset(eledigest,0,20); - mixDigest(eledigest,sdsele,sdslen(sdsele)); - mixDigest(eledigest,buf,strlen(buf)); - xorDigest(digest,eledigest,20); - } - dictReleaseIterator(di); - } else { - serverPanic("Unknown sorted set encoding"); - } - } else if (o->type == OBJ_HASH) { - hashTypeIterator *hi = hashTypeInitIterator(o); - while (hashTypeNext(hi) != C_ERR) { - unsigned char eledigest[20]; - sds sdsele; - - memset(eledigest,0,20); - sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY); - mixDigest(eledigest,sdsele,sdslen(sdsele)); - sdsfree(sdsele); - sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE); - mixDigest(eledigest,sdsele,sdslen(sdsele)); - sdsfree(sdsele); - xorDigest(digest,eledigest,20); - } - hashTypeReleaseIterator(hi); - } else if (o->type == OBJ_STREAM) { - streamIterator si; - streamIteratorStart(&si,o->ptr,NULL,NULL,0); - streamID id; - int64_t numfields; - - while(streamIteratorGetID(&si,&id,&numfields)) { - sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq); - mixDigest(digest,itemid,sdslen(itemid)); - sdsfree(itemid); - - while(numfields--) { - unsigned char *field, *value; - int64_t field_len, value_len; - streamIteratorGetField(&si,&field,&value, - &field_len,&value_len); - mixDigest(digest,field,field_len); - mixDigest(digest,value,value_len); - } - } - streamIteratorStop(&si); - } else if (o->type == OBJ_MODULE) { - RedisModuleDigest md; - moduleValue *mv = o->ptr; - moduleType *mt = mv->type; - moduleInitDigestContext(md); - if (mt->digest) { - mt->digest(&md,mv->value); - xorDigest(digest,md.x,sizeof(md.x)); - } - } else { - serverPanic("Unknown object type"); - } - /* If the key has an expire, add it to the mix */ - if (expiretime != -1) xorDigest(digest,"!!expire!!",10); /* We can finally xor the key-val digest to the final digest */ xorDigest(final,digest,20); decrRefCount(keyobj); From 5a0b7212c9c3a672cb47fa2a773bc6cf1c2c4f17 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Dec 2018 16:41:54 +0100 Subject: [PATCH 11/31] DEBUG DIGEST-VALUE implemented. --- src/debug.c | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/debug.c b/src/debug.c index 3c2e7fd55..5b4c803db 100644 --- a/src/debug.c +++ b/src/debug.c @@ -304,6 +304,7 @@ void debugCommand(client *c) { "CHANGE-REPL-ID -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.", "CRASH-AND-RECOVER -- Hard crash and restart after delay.", "DIGEST -- Output a hex signature representing the current DB content.", +"DIGEST-VALUE ... -- Output a hex signature of the values of all the specified keys.", "ERROR -- Return a Redis protocol error with as message. Useful for clients unit tests to simulate Redis errors.", "LOG -- write message to the server log.", "HTSTATS -- Return hash table statistics of the specified Redis database.", @@ -506,15 +507,28 @@ NULL } addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) { + /* DEBUG DIGEST (form without keys specified) */ unsigned char digest[20]; sds d = sdsempty(); - int j; computeDatasetDigest(digest); - for (j = 0; j < 20; j++) - d = sdscatprintf(d, "%02x",digest[j]); + for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]); addReplyStatus(c,d); sdsfree(d); + } else if (!strcasecmp(c->argv[1]->ptr,"digest-value") && c->argc >= 2) { + /* DEBUG DIGEST-VALUE key key key ... key. */ + addReplyMultiBulkLen(c,c->argc-2); + for (int j = 2; j < c->argc; j++) { + unsigned char digest[20]; + memset(digest,0,20); /* Start with a clean result */ + robj *o = lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH); + if (o) xorObjectDigest(c->db,c->argv[j],digest,o); + + sds d = sdsempty(); + for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]); + addReplyStatus(c,d); + sdsfree(d); + } } else if (!strcasecmp(c->argv[1]->ptr,"sleep") && c->argc == 3) { double dtime = strtod(c->argv[2]->ptr,NULL); long long utime = dtime*1000000; From 7e0cc2bb91dc610b68d2e063728a4416efd19885 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Dec 2018 17:06:55 +0100 Subject: [PATCH 12/31] Remove no longer relevant comment in processCommand(). --- src/server.c | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/server.c b/src/server.c index cc335ebdc..921311a91 100644 --- a/src/server.c +++ b/src/server.c @@ -2607,15 +2607,11 @@ int processCommand(client *c) { } /* Handle the maxmemory directive. - * - * First we try to free some memory if possible (if there are volatile - * keys in the dataset). If there are not the only thing we can do - * is returning an error. * * Note that we do not want to reclaim memory if we are here re-entering * the event loop since there is a busy Lua script running in timeout - * condition, to avoid mixing the propagation of scripts with the propagation - * of DELs due to eviction. */ + * condition, to avoid mixing the propagation of scripts with the + * propagation of DELs due to eviction. */ if (server.maxmemory && !server.lua_timedout) { int out_of_memory = freeMemoryIfNeeded() == C_ERR; /* freeMemoryIfNeeded may flush slave output buffers. This may result From 5bf13eaaf8f444bb547b00ed5ecd727a93cd8399 Mon Sep 17 00:00:00 2001 From: artix Date: Thu, 22 Nov 2018 11:47:59 +0100 Subject: [PATCH 13/31] Cluster Manager: check/fix commands now handle multiple owners even if all slots are covered and not open. --- src/redis-cli.c | 135 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 6 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 7e558a306..9d8c0bcaa 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2746,6 +2746,41 @@ cleanup: return success; } +static int clusterManagerDelSlot(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER DELSLOTS %d", slot); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerAddSlot(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER ADDSLOTS %d", slot); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node, + int slot) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER COUNTKEYSINSLOT %d", slot); + int count = -1; + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer; + if (reply) freeReplyObject(reply); + return count; +} + +static int clusterManagerBumpEpoch(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -4053,17 +4088,62 @@ cleanup: return success; } +static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { + clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot); + int success = 0; + assert(listLength(owners) > 1); + clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners, + slot, + NULL); + if (!owner) owner = listFirst(owners)->value; + clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d", + slot, owner->ip, owner->port); + /* Set the owner node by calling DELSLOTS in order to unassign the slot + * in case it's already assigned to another node and by finally calling + * ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it + * could reply with an "already unassigned" error and if it should fail + * for other reasons, it would lead to a failure in the follwing ADDSLOTS + * command. */ + clusterManagerDelSlot(owner, slot); + if (!clusterManagerAddSlot(owner, slot)) return 0; + if (!clusterManagerBumpEpoch(owner)) return 0; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + /* Update configuration in all the other master nodes by assigning the slot + * itself to the new owner, and by eventually migrating keys if the node + * has keys for the slot. */ + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n == owner) continue; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + int count = clusterManagerCountKeysInSlot(n, slot); + success = (count >= 0); + if (!success) break; + clusterManagerDelSlot(n, slot); + if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0; + if (count > 0) { + int opts = CLUSTER_MANAGER_OPT_VERBOSE | + CLUSTER_MANAGER_OPT_COLD; + success = clusterManagerMoveSlot(n, owner, slot, opts, NULL); + if (!success) break; + } + } + return success; +} + static int clusterManagerCheckCluster(int quiet) { listNode *ln = listFirst(cluster_manager.nodes); if (!ln) return 0; - int result = 1; - int do_fix = config.cluster_manager_command.flags & - CLUSTER_MANAGER_CMD_FLAG_FIX; clusterManagerNode *node = ln->value; clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n", node->ip, node->port); + int result = 1, consistent = 0; + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; if (!quiet) clusterManagerShowNodes(); - if (!clusterManagerIsConfigConsistent()) { + consistent = clusterManagerIsConfigConsistent(); + if (!consistent) { sds err = sdsnew("[ERR] Nodes don't agree about configuration!"); clusterManagerOnError(err); result = 0; @@ -4071,7 +4151,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogOk("[OK] All nodes agree about slots " "configuration.\n"); } - // Check open slots + /* Check open slots */ clusterManagerLogInfo(">>> Check for open slots...\n"); listIter li; listRewind(cluster_manager.nodes, &li); @@ -4130,7 +4210,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogErr("%s.\n", (char *) errstr); sdsfree(errstr); if (do_fix) { - // Fix open slots. + /* Fix open slots. */ dictReleaseIterator(iter); iter = dictGetIterator(open_slots); while ((entry = dictNext(iter)) != NULL) { @@ -4165,6 +4245,49 @@ static int clusterManagerCheckCluster(int quiet) { if (fixed > 0) result = 1; } } + if (!consistent) { + /* Check whether there are multiple owners, even when slots are + * fully covered and there are no open slots. */ + clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); + int slot = 0; + for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + list *owners = listCreate(); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + if (n->slots[slot]) listAddNodeTail(owners, n); + else { + /* Nodes having keys for the slot will be considered + * owners too. */ + int count = clusterManagerCountKeysInSlot(n, slot); + if (count > 0) listAddNodeTail(owners, n); + } + } + if (listLength(owners) > 1) { + result = 0; + clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n", + slot, listLength(owners)); + listRewind(owners, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + clusterManagerLogErr(" %s:%d\n", n->ip, n->port); + } + if (do_fix) { + result = clusterManagerFixMultipleSlotOwners(slot, owners); + if (!result) { + clusterManagerLogErr("Failed to fix multiple owners " + "for slot %d\n", slot); + listRelease(owners); + break; + } + } + } + listRelease(owners); + } + } return result; } From eaac9f9e930d8959d681b7d03d7411bfb18db3a7 Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 27 Nov 2018 12:26:56 +0100 Subject: [PATCH 14/31] Cluster Manager: code cleanup. --- src/redis-cli.c | 128 ++++++++++++++++-------------------------------- 1 file changed, 41 insertions(+), 87 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 9d8c0bcaa..a3eadf364 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2746,10 +2746,23 @@ cleanup: return success; } -static int clusterManagerDelSlot(clusterManagerNode *node, int slot) { +static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER SETSLOT %d %s", slot, "STABLE"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerDelSlot(clusterManagerNode *node, int slot, + int ignore_unassigned_err) +{ redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER DELSLOTS %d", slot); int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + ignore_unassigned_err && + strstr(reply->str, "already unassigned") != NULL) success = 1; if (reply) freeReplyObject(reply); return success; } @@ -3658,24 +3671,18 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(none, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + clusterManagerDelSlot(n, s, 1); + if (!clusterManagerAddSlot(n, s)) fixed = -1; + if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ - n->slots[atoi(slot)] = 1; + n->slots[s] = 1; fixed++; } } @@ -3691,6 +3698,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(single, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot); assert(entry != NULL); list *nodes = (list *) dictGetVal(entry); @@ -3700,16 +3708,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + clusterManagerDelSlot(n, s, 1); + if (!clusterManagerAddSlot(n, s)) fixed = -1; + if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3744,21 +3745,12 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { "to %s:%d\n", slot, target->ip, target->port); /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + clusterManagerDelSlot(target, s, 1); + if (!clusterManagerAddSlot(target, s)) fixed = -1; if (fixed < 0) goto cleanup; - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER SETSLOT %s %s", slot, "STABLE"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + if (!clusterManagerClearSlotStatus(target, s)) + fixed = -1; + if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1; if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3770,23 +3762,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *src = nln->value; if (src == target) continue; /* Assign the slot to target node in the source node. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "NODE", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerSetSlot(src, target, s, "NODE", NULL)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Set the source node in 'importing' state * (even if we will actually migrate keys away) * in order to avoid receiving redirections * for MIGRATE. */ - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "IMPORTING", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) - fixed = -1; - if (r) freeReplyObject(r); + if (!clusterManagerSetSlot(src, target, s, + "IMPORTING", NULL)) fixed = -1; if (fixed < 0) goto cleanup; int opts = CLUSTER_MANAGER_OPT_VERBOSE | CLUSTER_MANAGER_OPT_COLD; @@ -3794,12 +3778,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { fixed = -1; goto cleanup; } - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s", slot, - "STABLE"); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerClearSlotStatus(src, s)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; } fixed++; @@ -3923,24 +3903,13 @@ static int clusterManagerFixOpenSlot(int slot) { // Use ADDSLOTS to assign the slot. clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n", owner->ip, owner->port); - redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER " - "SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerClearSlotStatus(owner, slot); if (!success) goto cleanup; /* Ensure that the slot is unassigned before assigning it to the * owner. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); + success = clusterManagerDelSlot(owner, slot, 1); if (!success) goto cleanup; - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerAddSlot(owner, slot); if (!success) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3948,9 +3917,7 @@ static int clusterManagerFixOpenSlot(int slot) { /* Make sure this information will propagate. Not strictly needed * since there is no past owner, so all the other nodes will accept * whatever epoch this node will claim the slot with. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerBumpEpoch(owner); if (!success) goto cleanup; /* Remove the owner from the list of migrating/importing * nodes. */ @@ -3970,16 +3937,10 @@ static int clusterManagerFixOpenSlot(int slot) { * the owner has been set in the previous condition (owner == NULL). */ assert(owner != NULL); listRewind(owners, &li); - redisReply *reply = NULL; while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n == owner) continue; - reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(n, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); + success = clusterManagerDelSlot(n, slot, 1); if (!success) goto cleanup; n->slots[slot] = 0; /* Assign the slot to the owner in the node 'n' configuration.' */ @@ -4021,11 +3982,7 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; clusterManagerLogInfo(">>> Setting %d as STABLE in " "%s:%d\n", slot, n->ip, n->port); - - redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerClearSlotStatus(n, slot); if (!success) goto cleanup; } /* Since the slot has been moved in "cold" mode, ensure that all the @@ -4035,10 +3992,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerNode *n = ln->value; if (n == owner) continue; if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL); if (!success) goto cleanup; } } else { @@ -4104,7 +4058,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { * could reply with an "already unassigned" error and if it should fail * for other reasons, it would lead to a failure in the follwing ADDSLOTS * command. */ - clusterManagerDelSlot(owner, slot); + clusterManagerDelSlot(owner, slot, 1); if (!clusterManagerAddSlot(owner, slot)) return 0; if (!clusterManagerBumpEpoch(owner)) return 0; listIter li; @@ -4120,7 +4074,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { int count = clusterManagerCountKeysInSlot(n, slot); success = (count >= 0); if (!success) break; - clusterManagerDelSlot(n, slot); + clusterManagerDelSlot(n, slot, 1); if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0; if (count > 0) { int opts = CLUSTER_MANAGER_OPT_VERBOSE | From d5f7703367c83f46683d220fed785c18504dd5ca Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 28 Nov 2018 16:59:16 +0100 Subject: [PATCH 15/31] Cluster Manager: setting new slot owner is now handled atomically in 'fix' command. --- src/redis-cli.c | 103 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 31 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index a3eadf364..b66ea9e6e 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1929,6 +1929,7 @@ static dictType clusterManagerDictType = { }; typedef int clusterManagerCommandProc(int argc, char **argv); +typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); /* Cluster Manager helper functions */ @@ -2188,6 +2189,38 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n, return 1; } +/* Execute MULTI command on a cluster node. */ +static int clusterManagerStartTransaction(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +/* Execute EXEC command on a cluster node. */ +static int clusterManagerExecTransaction(clusterManagerNode *node, + clusterManagerOnReplyError onerror) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success) { + if (reply->type != REDIS_REPLY_ARRAY) { + success = 0; + goto cleanup; + } + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + success = clusterManagerCheckRedisReply(node, r, NULL); + if (!success && onerror) success = onerror(r, i); + if (!success) break; + } + } +cleanup: + if (reply) freeReplyObject(reply); + return success; +} + static int clusterManagerNodeConnect(clusterManagerNode *node) { if (node->context) redisFree(node->context); node->context = redisConnect(node->ip, node->port); @@ -2794,6 +2827,31 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) { return success; } +static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { + if (bulk_idx == 0 && reply) { + if (reply->type == REDIS_REPLY_ERROR) + return strstr(reply->str, "already unassigned") != NULL; + } + return 0; +} + +static int clusterManagerSetSlotOwner(clusterManagerNode *owner, + int slot, + int do_clear) +{ + int success = clusterManagerStartTransaction(owner); + if (!success) return 0; + /* Ensure the slot is not already assigned. */ + clusterManagerDelSlot(owner, slot, 0); + /* Add the slot and bump epoch. */ + clusterManagerAddSlot(owner, slot); + if (do_clear) clusterManagerClearSlotStatus(owner, slot); + clusterManagerBumpEpoch(owner); + success = clusterManagerExecTransaction(owner, + clusterManagerIgnoreUnassignedErr); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -3675,11 +3733,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(n, s, 1); - if (!clusterManagerAddSlot(n, s)) fixed = -1; - if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ n->slots[s] = 1; @@ -3707,11 +3764,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = fn->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(n, s, 1); - if (!clusterManagerAddSlot(n, s)) fixed = -1; - if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ n->slots[atoi(slot)] = 1; @@ -3744,14 +3800,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s moving keys " "to %s:%d\n", slot, target->ip, target->port); - /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(target, s, 1); - if (!clusterManagerAddSlot(target, s)) fixed = -1; - if (fixed < 0) goto cleanup; - if (!clusterManagerClearSlotStatus(target, s)) + if (!clusterManagerSetSlotOwner(target, s, 1)) { fixed = -1; - if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1; - if (fixed < 0) goto cleanup; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ target->slots[atoi(slot)] = 1; @@ -3905,11 +3957,7 @@ static int clusterManagerFixOpenSlot(int slot) { owner->ip, owner->port); success = clusterManagerClearSlotStatus(owner, slot); if (!success) goto cleanup; - /* Ensure that the slot is unassigned before assigning it to the - * owner. */ - success = clusterManagerDelSlot(owner, slot, 1); - if (!success) goto cleanup; - success = clusterManagerAddSlot(owner, slot); + success = clusterManagerSetSlotOwner(owner, slot, 0); if (!success) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -4052,15 +4100,8 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { if (!owner) owner = listFirst(owners)->value; clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d", slot, owner->ip, owner->port); - /* Set the owner node by calling DELSLOTS in order to unassign the slot - * in case it's already assigned to another node and by finally calling - * ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it - * could reply with an "already unassigned" error and if it should fail - * for other reasons, it would lead to a failure in the follwing ADDSLOTS - * command. */ - clusterManagerDelSlot(owner, slot, 1); - if (!clusterManagerAddSlot(owner, slot)) return 0; - if (!clusterManagerBumpEpoch(owner)) return 0; + /* Set the slot owner. */ + if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0; listIter li; listNode *ln; listRewind(cluster_manager.nodes, &li); From 1a56fc913e70d61449c79b643f4ab241ece3cbec Mon Sep 17 00:00:00 2001 From: artix Date: Fri, 30 Nov 2018 20:48:52 +0100 Subject: [PATCH 16/31] Cluster Manager: 'fix' command now handles open slots with migrating state in one node and importing state in multiple nodes. --- src/redis-cli.c | 80 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 6 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index b66ea9e6e..56d2ee0e7 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1846,7 +1846,7 @@ static int evalMode(int argc, char **argv) { if (eval_ldb) { if (!config.eval_ldb) { /* If the debugging session ended immediately, there was an - * error compiling the script. Show it and don't enter + * error compiling the script. Show it and they don't enter * the REPL at all. */ printf("Eval debugging session can't start:\n"); cliReadReply(0); @@ -4043,6 +4043,73 @@ static int clusterManagerFixOpenSlot(int slot) { success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL); if (!success) goto cleanup; } + } + /* Case 3: The slot is in migrating state in one node but multiple + * other nodes claim to be in importing state and don't have any key in + * the slot. We search for the importing node having the same ID as + * the destination node of the migrating node. + * In that case we move the slot from the migrating node to this node and + * we close the importing states on all the other importing nodes. + * If no importing node has the same ID as the destination node of the + * migrating node, the slot's state is closed on both the migrating node + * and the importing nodes. */ + else if (listLength(migrating) == 1 && listLength(importing) > 1) { + int try_to_fix = 1; + clusterManagerNode *src = listFirst(migrating)->value; + clusterManagerNode *dst = NULL; + sds target_id = NULL; + for (int i = 0; i < src->migrating_count; i += 2) { + sds migrating_slot = src->migrating[i]; + if (atoi(migrating_slot) == slot) { + target_id = src->migrating[i + 1]; + break; + } + } + assert(target_id != NULL); + listIter li; + listNode *ln; + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + int count = clusterManagerCountKeysInSlot(n, slot); + if (count > 0) { + try_to_fix = 0; + break; + } + if (strcmp(n->name, target_id) == 0) dst = n; + } + if (!try_to_fix) goto unhandled_case; + if (dst != NULL) { + clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to " + "%s:%d and closing it on all the other " + "importing nodes.\n", + slot, src->ip, src->port, + dst->ip, dst->port); + /* Move the slot to the destination node. */ + success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); + if (!success) goto cleanup; + /* Close slot on all the other importing nodes. */ + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (dst == n) continue; + success = clusterManagerClearSlotStatus(n, slot); + if (!success) goto cleanup; + } + } else { + clusterManagerLogInfo(">>> Case 3: Closing slot %d on both " + "migrating and importing nodes.\n", slot); + /* Close the slot on both the migrating node and the importing + * nodes. */ + success = clusterManagerClearSlotStatus(src, slot); + if (!success) goto cleanup; + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + success = clusterManagerClearSlotStatus(n, slot); + if (!success) goto cleanup; + } + } } else { int try_to_close_slot = (listLength(importing) == 0 && listLength(migrating) == 1); @@ -4059,13 +4126,13 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; } } - /* Case 3: There are no slots claiming to be in importing state, but - * there is a migrating node that actually don't have any key or is the - * slot owner. We can just close the slot, probably a reshard interrupted - * in the middle. */ + /* Case 4: There are no slots claiming to be in importing state, but + * there is a migrating node that actually don't have any key or is the + * slot owner. We can just close the slot, probably a reshard + * interrupted in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; - clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n", + clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n", slot, n->ip, n->port); redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", slot, "STABLE"); @@ -4073,6 +4140,7 @@ static int clusterManagerFixOpenSlot(int slot) { if (r) freeReplyObject(r); if (!success) goto cleanup; } else { +unhandled_case: success = 0; clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot " "yet (work in progress). Slot is set as " From 0c1336caf4ba34ee7a41c21fb12756b542a0d2e8 Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 5 Dec 2018 20:09:09 +0100 Subject: [PATCH 17/31] Cluster Manager: - FixOpenSlot now correctly updates in-memory cluster configuration. - Improved output messages. --- src/redis-cli.c | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 56d2ee0e7..eeeaddc6b 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2211,8 +2211,14 @@ static int clusterManagerExecTransaction(clusterManagerNode *node, size_t i; for (i = 0; i < reply->elements; i++) { redisReply *r = reply->element[i]; - success = clusterManagerCheckRedisReply(node, r, NULL); + char *err = NULL; + success = clusterManagerCheckRedisReply(node, r, &err); if (!success && onerror) success = onerror(r, i); + if (err) { + if (!success) + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); + zfree(err); + } if (!success) break; } } @@ -2792,10 +2798,15 @@ static int clusterManagerDelSlot(clusterManagerNode *node, int slot, { redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER DELSLOTS %d", slot); - int success = clusterManagerCheckRedisReply(node, reply, NULL); + char *err = NULL; + int success = clusterManagerCheckRedisReply(node, reply, &err); if (!success && reply && reply->type == REDIS_REPLY_ERROR && ignore_unassigned_err && strstr(reply->str, "already unassigned") != NULL) success = 1; + if (!success && err != NULL) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); + zfree(err); + } if (reply) freeReplyObject(reply); return success; } @@ -2842,7 +2853,7 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner, int success = clusterManagerStartTransaction(owner); if (!success) return 0; /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(owner, slot, 0); + clusterManagerDelSlot(owner, slot, 1); /* Add the slot and bump epoch. */ clusterManagerAddSlot(owner, slot); if (do_clear) clusterManagerClearSlotStatus(owner, slot); @@ -3747,7 +3758,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { /* Handle case "2": keys only in one node. */ if (listLength(single) > 0) { - printf("The following uncovered slots have keys in just one node:\n"); + printf("The following uncovered slots have keys in just one node:\n"); clusterManagerPrintSlotsList(single); if (confirmWithYes("Fix these slots by covering with those nodes?")){ listIter li; @@ -4012,6 +4023,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogInfo(">>> Case 1: Moving slot %d from " "%s:%d to %s:%d\n", slot, src->ip, src->port, dst->ip, dst->port); + move_opts |= CLUSTER_MANAGER_OPT_UPDATE; success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); } /* Case 2: There are multiple nodes that claim the slot as importing, @@ -4166,7 +4178,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { slot, NULL); if (!owner) owner = listFirst(owners)->value; - clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d", + clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n", slot, owner->ip, owner->port); /* Set the slot owner. */ if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0; From 27ddb2ba3a8759b306501882bd76760640e6705a Mon Sep 17 00:00:00 2001 From: artix Date: Mon, 10 Dec 2018 18:01:13 +0100 Subject: [PATCH 18/31] Cluster Manager: - Multiple owners checking in 'fix'/'check' commands is now optional (using --cluster-search-multiple-owners). - Updated help. --- src/redis-cli.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index eeeaddc6b..a3fb065d5 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -117,6 +117,7 @@ #define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6 #define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7 #define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8 +#define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9 #define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0 #define CLUSTER_MANAGER_OPT_COLD 1 << 1 @@ -1378,6 +1379,9 @@ static int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) { config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; + } else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) { + config.cluster_manager_command.flags |= + CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) { sds version = cliVersion(); printf("redis-cli %s\n", version); @@ -1991,14 +1995,17 @@ typedef struct clusterManagerCommandDef { clusterManagerCommandDef clusterManagerCommands[] = { {"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN", "replicas "}, - {"check", clusterManagerCommandCheck, -1, "host:port", NULL}, + {"check", clusterManagerCommandCheck, -1, "host:port", + "search-multiple-owners"}, {"info", clusterManagerCommandInfo, -1, "host:port", NULL}, - {"fix", clusterManagerCommandFix, -1, "host:port", NULL}, + {"fix", clusterManagerCommandFix, -1, "host:port", + "search-multiple-owners"}, {"reshard", clusterManagerCommandReshard, -1, "host:port", - "from ,to ,slots ,yes,timeout ,pipeline "}, + "from ,to ,slots ,yes,timeout ,pipeline ," + "replace"}, {"rebalance", clusterManagerCommandRebalance, -1, "host:port", "weight ,use-empty-masters," - "timeout ,simulate,pipeline ,threshold "}, + "timeout ,simulate,pipeline ,threshold ,replace"}, {"add-node", clusterManagerCommandAddNode, 2, "new_host:new_port existing_host:existing_port", "slave,master-id "}, {"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL}, @@ -4320,7 +4327,9 @@ static int clusterManagerCheckCluster(int quiet) { if (fixed > 0) result = 1; } } - if (!consistent) { + int search_multiple_owners = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; + if (search_multiple_owners) { /* Check whether there are multiple owners, even when slots are * fully covered and there are no open slots. */ clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); From 274531396caedd710595f46cc14010ed68a0c931 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Dec 2018 11:39:18 +0100 Subject: [PATCH 19/31] Reject EXEC containing write commands against RO replica. Thanks to @soloestoy for discovering this issue in #5667. This is an alternative fix in order to avoid both cycling the clients and also disconnecting clients just having valid read-only transactions pending. --- src/multi.c | 17 +++++++++++++++++ src/server.h | 3 +++ 2 files changed, 20 insertions(+) diff --git a/src/multi.c b/src/multi.c index 8159adcb3..c2e641879 100644 --- a/src/multi.c +++ b/src/multi.c @@ -35,6 +35,7 @@ void initClientMultiState(client *c) { c->mstate.commands = NULL; c->mstate.count = 0; + c->mstate.cmd_flags = 0; } /* Release all the resources associated with MULTI/EXEC state */ @@ -67,6 +68,7 @@ void queueMultiCommand(client *c) { for (j = 0; j < c->argc; j++) incrRefCount(mc->argv[j]); c->mstate.count++; + c->mstate.cmd_flags |= c->cmd->flags; } void discardTransaction(client *c) { @@ -137,6 +139,21 @@ void execCommand(client *c) { goto handle_monitor; } + /* If there are write commands inside the transaction, and this is a read + * only slave, we want to send an error. This happens when the transaction + * was initiated when the instance was a master or a writable replica and + * then the configuration changed (for example instance was turned into + * a replica). */ + if (server.masterhost && server.repl_slave_ro && + !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE) + { + addReplyError(c, + "Transaction contains write commands but instance " + "is now a read-only slave. EXEC aborted."); + discardTransaction(c); + goto handle_monitor; + } + /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; diff --git a/src/server.h b/src/server.h index f7c348622..a87fd9759 100644 --- a/src/server.h +++ b/src/server.h @@ -654,6 +654,9 @@ typedef struct multiCmd { typedef struct multiState { multiCmd *commands; /* Array of MULTI commands */ int count; /* Total number of MULTI commands */ + int cmd_flags; /* The accumulated command flags OR-ed together. + So if at least a command has a given flag, it + will be set in this field. */ int minreplicas; /* MINREPLICAS for synchronous replication */ time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ } multiState; From 1530c0a7dd10f67ff5a1622c107478c6d8275867 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 11 Dec 2018 19:47:36 +0800 Subject: [PATCH 20/31] multi: ignore multiState's cmd_flags when loading AOF --- src/multi.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/multi.c b/src/multi.c index c2e641879..5971f4653 100644 --- a/src/multi.c +++ b/src/multi.c @@ -144,7 +144,7 @@ void execCommand(client *c) { * was initiated when the instance was a master or a writable replica and * then the configuration changed (for example instance was turned into * a replica). */ - if (server.masterhost && server.repl_slave_ro && + if (!server.loading && server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE) { addReplyError(c, From 6100be7d827bc1329b156b165a4e08fd91cbc115 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 11 Dec 2018 19:53:54 +0800 Subject: [PATCH 21/31] fix typo --- src/multi.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/multi.c b/src/multi.c index 5971f4653..d7f7d4ae8 100644 --- a/src/multi.c +++ b/src/multi.c @@ -149,7 +149,7 @@ void execCommand(client *c) { { addReplyError(c, "Transaction contains write commands but instance " - "is now a read-only slave. EXEC aborted."); + "is now a read-only replica. EXEC aborted."); discardTransaction(c); goto handle_monitor; } From c710d4afdccc0c797745bc3264f3f32a4cdd85da Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Dec 2018 13:18:52 +0100 Subject: [PATCH 22/31] Fix stringmatchlen() read past buffer bug. See #5632. --- src/util.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util.c b/src/util.c index 430cbe61a..1dc8dd871 100644 --- a/src/util.c +++ b/src/util.c @@ -48,7 +48,7 @@ int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase) { - while(patternLen) { + while(patternLen && stringLen) { switch(pattern[0]) { case '*': while (pattern[1] == '*') { From a31ca8d75375a196bb4a2393db9f9d2307416d8f Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Dec 2018 13:29:30 +0100 Subject: [PATCH 23/31] stringmatchlen() fuzz test added. Verified to be able to trigger at least #5632. Does not report other issues. --- src/debug.c | 5 +++++ src/util.c | 16 ++++++++++++++++ src/util.h | 1 + 3 files changed, 22 insertions(+) diff --git a/src/debug.c b/src/debug.c index e7d6a5cb8..1ec7c4977 100644 --- a/src/debug.c +++ b/src/debug.c @@ -322,6 +322,7 @@ void debugCommand(client *c) { "SLEEP -- Stop the server for . Decimals allowed.", "STRUCTSIZE -- Return the size of different Redis core C structures.", "ZIPLIST -- Show low level info about the ziplist encoding.", +"STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.", NULL }; addReplyHelp(c, help); @@ -619,6 +620,10 @@ NULL changeReplicationId(); clearReplicationId2(); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"stringmatch-test") && c->argc == 2) + { + stringmatchlen_fuzz_test(); + addReplyStatus(c,"Apparently Redis did not crash: test passed"); } else { addReplySubcommandSyntaxError(c); return; diff --git a/src/util.c b/src/util.c index 1dc8dd871..66d599190 100644 --- a/src/util.c +++ b/src/util.c @@ -171,6 +171,22 @@ int stringmatch(const char *pattern, const char *string, int nocase) { return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase); } +/* Fuzz stringmatchlen() trying to crash it with bad input. */ +int stringmatchlen_fuzz_test(void) { + char str[32]; + char pat[32]; + int cycles = 10000000; + int total_matches = 0; + while(cycles--) { + int strlen = rand() % sizeof(str); + int patlen = rand() % sizeof(pat); + for (int j = 0; j < strlen; j++) str[j] = rand() % 128; + for (int j = 0; j < patlen; j++) pat[j] = rand() % 128; + total_matches += stringmatchlen(pat, patlen, str, strlen, 0); + } + return total_matches; +} + /* Convert a string representing an amount of memory into the number of * bytes, so for instance memtoll("1Gb") will return 1073741824 that is * (1024*1024*1024). diff --git a/src/util.h b/src/util.h index cc154d968..b6c01aa59 100644 --- a/src/util.h +++ b/src/util.h @@ -40,6 +40,7 @@ int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase); int stringmatch(const char *p, const char *s, int nocase); +int stringmatchlen_fuzz_test(void); long long memtoll(const char *p, int *err); uint32_t digits10(uint64_t v); uint32_t sdigits10(int64_t v); From b9cd89d1089a488b82517ad8f7899dde10cf7c25 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 12 Dec 2018 00:25:24 +0800 Subject: [PATCH 24/31] evict: don't care about mem if loading When loading data, we call processEventsWhileBlocked to process events and execute commands. But if we are loading AOF it's dangerous, because processCommand would call freeMemoryIfNeeded to evict, and that will break data consistency, see issue #5686. --- src/evict.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/evict.c b/src/evict.c index 39deb65a6..980a91f79 100644 --- a/src/evict.c +++ b/src/evict.c @@ -444,9 +444,15 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * Otehrwise if we are over the memory limit, but not enough memory * was freed to return back under the limit, the function returns C_ERR. */ int freeMemoryIfNeeded(void) { - /* By default slaves should ignore maxmemory and just be masters excat - * copies. */ - if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; + /* By default replicas should ignore maxmemory + * and just be masters exact copies. + * + * And don't care about mem if loading. */ + if (server.loading || + (server.masterhost && server.repl_slave_ignore_maxmemory)) + { + return C_OK; + } size_t mem_reported, mem_tofree, mem_freed; mstime_t latency, eviction_latency; From 03e2bb0cfd1ad6dba920e72b3d212cc3abb97d98 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Dec 2018 17:50:18 +0100 Subject: [PATCH 25/31] Crashing is too much in addReplyErrorLength(). See #5663. --- src/networking.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/networking.c b/src/networking.c index f19eb86bb..74b857e04 100644 --- a/src/networking.c +++ b/src/networking.c @@ -372,12 +372,6 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " "to its %s: '%s' after processing the command " "'%s'", from, to, s, cmdname); - /* Here we want to panic because when a master is sending an - * error to some slave in the context of replication, this can - * only create some kind of offset or data desynchronization. Better - * to catch it ASAP and crash instead of continuing. */ - if (c->flags & CLIENT_SLAVE) - serverPanic("Continuing is unsafe: replication protocol violation."); } } From 129f2d2746ca80451d8c84b223b568298020b125 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 12 Dec 2018 11:37:15 +0100 Subject: [PATCH 26/31] freeMemoryIfNeeded() small refactoring. Related to issue #5686 and PR #5689. --- src/config.c | 2 +- src/evict.c | 21 +++++++++++++-------- src/server.c | 2 +- src/server.h | 1 + 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/config.c b/src/config.c index 8a2690a0c..9f51bba85 100644 --- a/src/config.c +++ b/src/config.c @@ -1248,7 +1248,7 @@ void configSetCommand(client *c) { if (server.maxmemory < zmalloc_used_memory()) { serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy."); } - freeMemoryIfNeeded(); + freeMemoryIfNeededAndSafe(); } } config_set_memory_field( "proto-max-bulk-len",server.proto_max_bulk_len) { diff --git a/src/evict.c b/src/evict.c index 980a91f79..773916ce8 100644 --- a/src/evict.c +++ b/src/evict.c @@ -445,14 +445,8 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * was freed to return back under the limit, the function returns C_ERR. */ int freeMemoryIfNeeded(void) { /* By default replicas should ignore maxmemory - * and just be masters exact copies. - * - * And don't care about mem if loading. */ - if (server.loading || - (server.masterhost && server.repl_slave_ignore_maxmemory)) - { - return C_OK; - } + * and just be masters exact copies. */ + if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; size_t mem_reported, mem_tofree, mem_freed; mstime_t latency, eviction_latency; @@ -628,3 +622,14 @@ cant_free: return C_ERR; } +/* This is a wrapper for freeMemoryIfNeeded() that only really calls the + * function if right now there are the conditions to do so safely: + * + * - There must be no script in timeout condition. + * - Nor we are loading data right now. + * + */ +int freeMemoryIfNeededAndSafe(void) { + if (server.lua_timedout || server.loading) return C_OK; + return freeMemoryIfNeeded(); +} diff --git a/src/server.c b/src/server.c index 28a794551..9371b2bac 100644 --- a/src/server.c +++ b/src/server.c @@ -2613,7 +2613,7 @@ int processCommand(client *c) { * condition, to avoid mixing the propagation of scripts with the * propagation of DELs due to eviction. */ if (server.maxmemory && !server.lua_timedout) { - int out_of_memory = freeMemoryIfNeeded() == C_ERR; + int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR; diff --git a/src/server.h b/src/server.h index a87fd9759..da4c6d45a 100644 --- a/src/server.h +++ b/src/server.h @@ -1702,6 +1702,7 @@ int zslLexValueLteMax(sds value, zlexrangespec *spec); int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level); size_t freeMemoryGetNotCountedMemory(); int freeMemoryIfNeeded(void); +int freeMemoryIfNeededAndSafe(void); int processCommand(client *c); void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); From 009a9292694491ff9eec78c024d38b0b5ca83f2e Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 12 Dec 2018 11:55:30 +0100 Subject: [PATCH 27/31] Remove debugging printf from replication.tcl test. --- tests/integration/replication.tcl | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index b61dfac18..0e50c20a9 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -275,7 +275,6 @@ start_server {tags {"repl"}} { start_server {} { test "Master stream is correctly processed while the replica has a script in -BUSY state" { set slave [srv 0 client] - puts [srv 0 port] $slave config set lua-time-limit 500 $slave slaveof $master_host $master_port From d935cfcb89fe70a0d9c039605d1df38d0be59db7 Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 12 Dec 2018 13:23:08 +0100 Subject: [PATCH 28/31] Cluster Manager: avoid using reply error messages to check slot status. Slot assignment status is now checked by using CLUSTER SLOTS. Furthermore, one memory leak has been fixed. --- src/redis-cli.c | 136 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 111 insertions(+), 25 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index a3fb065d5..a93bd9b10 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1933,7 +1933,8 @@ static dictType clusterManagerDictType = { }; typedef int clusterManagerCommandProc(int argc, char **argv); -typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); +typedef int (*clusterManagerOnReplyError)(redisReply *reply, + clusterManagerNode *n, int bulk_idx); /* Cluster Manager helper functions */ @@ -2196,7 +2197,7 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n, return 1; } -/* Execute MULTI command on a cluster node. */ +/* Call MULTI command on a cluster node. */ static int clusterManagerStartTransaction(clusterManagerNode *node) { redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); int success = clusterManagerCheckRedisReply(node, reply, NULL); @@ -2204,7 +2205,7 @@ static int clusterManagerStartTransaction(clusterManagerNode *node) { return success; } -/* Execute EXEC command on a cluster node. */ +/* Call EXEC command on a cluster node. */ static int clusterManagerExecTransaction(clusterManagerNode *node, clusterManagerOnReplyError onerror) { @@ -2220,7 +2221,7 @@ static int clusterManagerExecTransaction(clusterManagerNode *node, redisReply *r = reply->element[i]; char *err = NULL; success = clusterManagerCheckRedisReply(node, r, &err); - if (!success && onerror) success = onerror(r, i); + if (!success && onerror) success = onerror(r, node, i); if (err) { if (!success) CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); @@ -2768,6 +2769,55 @@ cleanup: return success; } +/* Get the node the slot is assigned to from the point of view of node *n. + * If the slot is unassigned or if the reply is an error, return NULL. + * Use the **err argument in order to check wether the slot is unassigned + * or the reply resulted in an error. */ +static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n, + int slot, char **err) +{ + assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS); + clusterManagerNode *owner = NULL; + redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS"); + if (clusterManagerCheckRedisReply(n, reply, err)) { + assert(reply->type == REDIS_REPLY_ARRAY); + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3); + int from, to; + from = r->element[0]->integer; + to = r->element[1]->integer; + if (slot < from || slot > to) continue; + redisReply *nr = r->element[2]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2); + char *name = NULL; + if (nr->elements >= 3) + name = nr->element[2]->str; + if (name != NULL) + owner = clusterManagerNodeByName(name); + else { + char *ip = nr->element[0]->str; + assert(ip != NULL); + int port = (int) nr->element[1]->integer; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *nd = ln->value; + if (strcmp(nd->ip, ip) == 0 && port == nd->port) { + owner = nd; + break; + } + } + } + if (owner) break; + } + } + if (reply) freeReplyObject(reply); + return owner; +} + /* Set slot status to "importing" or "migrating" */ static int clusterManagerSetSlot(clusterManagerNode *node1, clusterManagerNode *node2, @@ -2808,8 +2858,19 @@ static int clusterManagerDelSlot(clusterManagerNode *node, int slot, char *err = NULL; int success = clusterManagerCheckRedisReply(node, reply, &err); if (!success && reply && reply->type == REDIS_REPLY_ERROR && - ignore_unassigned_err && - strstr(reply->str, "already unassigned") != NULL) success = 1; + ignore_unassigned_err) + { + char *get_owner_err = NULL; + clusterManagerNode *assigned_to = + clusterManagerGetSlotOwner(node, slot, &get_owner_err); + if (!assigned_to) { + if (get_owner_err == NULL) success = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err); + zfree(get_owner_err); + } + } + } if (!success && err != NULL) { CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); zfree(err); @@ -2845,12 +2906,16 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) { return success; } -static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { - if (bulk_idx == 0 && reply) { - if (reply->type == REDIS_REPLY_ERROR) - return strstr(reply->str, "already unassigned") != NULL; - } - return 0; +/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore + * errors except for ADDSLOTS errors. + * Return 1 if the error should be ignored. */ +static int clusterManagerOnSetOwnerErr(redisReply *reply, + clusterManagerNode *n, int bulk_idx) +{ + UNUSED(reply); + UNUSED(n); + /* Only raise error when ADDSLOTS fail (bulk_idx == 1). */ + return (bulk_idx != 1); } static int clusterManagerSetSlotOwner(clusterManagerNode *owner, @@ -2865,8 +2930,7 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner, clusterManagerAddSlot(owner, slot); if (do_clear) clusterManagerClearSlotStatus(owner, slot); clusterManagerBumpEpoch(owner); - success = clusterManagerExecTransaction(owner, - clusterManagerIgnoreUnassignedErr); + success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr); return success; } @@ -2950,8 +3014,8 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, char **err) { int success = 1; - int replace_existing_keys = (config.cluster_manager_command.flags & - (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); + int retry = (config.cluster_manager_command.flags & + (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); while (1) { char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; @@ -2983,16 +3047,35 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; - int not_served = strstr(migrate_reply->str, "slot not served") != NULL; - if (replace_existing_keys && (is_busy || not_served)) { + int not_served = 0; + if (!is_busy) { + char *get_owner_err = NULL; + clusterManagerNode *served_by = + clusterManagerGetSlotOwner(source, slot, &get_owner_err); + if (!served_by) { + if (get_owner_err == NULL) not_served = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, + get_owner_err); + zfree(get_owner_err); + } + } + } + if (retry && (is_busy || not_served)) { /* If the key already exists, try to migrate keys * adding REPLACE option. * If the key's slot is not served, try to assign slot * to the target node. */ - if (not_served) + if (not_served) { + clusterManagerLogWarn("*** Slot was not served, setting " + "owner to node %s:%d.\n", + target->ip, target->port); clusterManagerSetSlot(source, target, slot, "node", NULL); - clusterManagerLogWarn("*** Target key exists. " - "Replacing it for FIX.\n"); + } + if (is_busy) { + clusterManagerLogWarn("*** Target key exists. " + "Replacing it for FIX.\n"); + } freeReplyObject(migrate_reply); migrate_reply = clusterManagerMigrateKeysInReply(source, target, @@ -4252,7 +4335,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->migrating_count; i += 2) { sds slot = n->migrating[i]; - dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4270,7 +4353,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->importing_count; i += 2) { sds slot = n->importing[i]; - dictAdd(open_slots, slot, sdsdup(n->importing[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->importing[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4333,7 +4416,7 @@ static int clusterManagerCheckCluster(int quiet) { /* Check whether there are multiple owners, even when slots are * fully covered and there are no open slots. */ clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); - int slot = 0; + int slot = 0, slots_with_multiple_owners = 0; for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { listIter li; listNode *ln; @@ -4359,6 +4442,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerNode *n = ln->value; clusterManagerLogErr(" %s:%d\n", n->ip, n->port); } + slots_with_multiple_owners++; if (do_fix) { result = clusterManagerFixMultipleSlotOwners(slot, owners); if (!result) { @@ -4366,11 +4450,13 @@ static int clusterManagerCheckCluster(int quiet) { "for slot %d\n", slot); listRelease(owners); break; - } + } else slots_with_multiple_owners--; } } listRelease(owners); } + if (slots_with_multiple_owners == 0) + clusterManagerLogOk("[OK] No multiple owners found.\n"); } return result; } From 143bfa1e6e65cf8be1eaad0b8169e2d95ca62f9a Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 18 Dec 2018 17:38:57 +0100 Subject: [PATCH 29/31] Cluster Manager: compare key values after BUSYKEY error (migration). If a key exists in the target node during a migration (BUSYKEY), the value of the key on both nodes (source and target) will be compared. If the key has the same value on both keys, the migration will be automatically retried with the REPLACE argument in order to override the target's key. If the key has different values, the behaviour will depend on such cases: - In case of 'fix' command, the migration will stop and the user will be warned to manually check the key(s). - In other cases (ie. reshard), if the user launched the command with the --cluster-replace option, the migration will be retried with the REPLACE argument, elsewhere the migration will stop and the user will be warned. --- src/redis-cli.c | 133 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 124 insertions(+), 9 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index a93bd9b10..b0a12ebb9 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2934,6 +2934,68 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner, return success; } +/* Get the hash for the values of the specified keys in *keys_reply for the + * specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command + * on both nodes. Every key with same name on both nodes but having different + * values will be added to the *diffs list. Return 0 in case of reply + * error. */ +static int clusterManagerCompareKeysValues(clusterManagerNode *n1, + clusterManagerNode *n2, + redisReply *keys_reply, + list *diffs) +{ + size_t i, argc = keys_reply->elements + 2; + static const char *hash_zero = "0000000000000000000000000000000000000000"; + char **argv = zcalloc(argc * sizeof(char *)); + size_t *argv_len = zcalloc(argc * sizeof(size_t)); + argv[0] = "DEBUG"; + argv_len[0] = 5; + argv[1] = "DIGEST-VALUE"; + argv_len[1] = 12; + for (i = 0; i < keys_reply->elements; i++) { + redisReply *entry = keys_reply->element[i]; + int idx = i + 2; + argv[idx] = entry->str; + argv_len[idx] = entry->len; + } + int success = 0; + void *_reply1 = NULL, *_reply2 = NULL; + redisReply *r1 = NULL, *r2 = NULL; + redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n1->context, &_reply1) == REDIS_OK); + if (!success) goto cleanup; + r1 = (redisReply *) _reply1; + redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n2->context, &_reply2) == REDIS_OK); + if (!success) goto cleanup; + r2 = (redisReply *) _reply2; + success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR); + if (r1->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str); + success = 0; + } + if (r2->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str); + success = 0; + } + if (!success) goto cleanup; + assert(keys_reply->elements == r1->elements && + keys_reply->elements == r2->elements); + for (i = 0; i < keys_reply->elements; i++) { + char *key = keys_reply->element[i]->str; + char *hash1 = r1->element[i]->str; + char *hash2 = r2->element[i]->str; + /* Ignore keys that don't exist in both nodes. */ + if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0) + continue; + if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key); + } +cleanup: + if (r1) freeReplyObject(r1); + if (r2) freeReplyObject(r2); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -3014,8 +3076,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, char **err) { int success = 1; - int retry = (config.cluster_manager_command.flags & - (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; + int do_replace = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_REPLACE; while (1) { char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; @@ -3049,6 +3113,8 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; int not_served = 0; if (!is_busy) { + /* Check if the slot is unassigned (not served) in the + * source node's configuration. */ char *get_owner_err = NULL; clusterManagerNode *served_by = clusterManagerGetSlotOwner(source, slot, &get_owner_err); @@ -3061,20 +3127,69 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, } } } - if (retry && (is_busy || not_served)) { - /* If the key already exists, try to migrate keys - * adding REPLACE option. - * If the key's slot is not served, try to assign slot + /* Try to handle errors. */ + if (is_busy || not_served) { + /* If the key's slot is not served, try to assign slot * to the target node. */ - if (not_served) { + if (do_fix && not_served) { clusterManagerLogWarn("*** Slot was not served, setting " "owner to node %s:%d.\n", target->ip, target->port); clusterManagerSetSlot(source, target, slot, "node", NULL); } + /* If the key already exists in the target node (BUSYKEY), + * check whether its value is the same in both nodes. + * In case of equal values, retry migration with the + * REPLACE option. + * In case of different values: + * - If the migration is requested by the fix command, stop + * and warn the user. + * - In other cases (ie. reshard), proceed only if the user + * launched the command with the --cluster-replace option.*/ if (is_busy) { - clusterManagerLogWarn("*** Target key exists. " - "Replacing it for FIX.\n"); + clusterManagerLogWarn("\n*** Target key exists, " + "checking values...\n"); + list *diffs = listCreate(); + success = clusterManagerCompareKeysValues(source, + target, reply, diffs); + if (!success && (do_fix || !do_replace)) { + listRelease(diffs); + clusterManagerLogErr("*** Value check failed!\n"); + goto next; + } + if (listLength(diffs) > 0 && (do_fix || !do_replace)) { + success = 0; + clusterManagerLogErr( + "*** Found %d key(s) in both source node and " + "target node having different values.\n" + " Source node: %s:%d\n" + " Target node: %s:%d\n" + " Keys(s):\n", + listLength(diffs), + source->ip, source->port, + target->ip, target->port); + listIter dli; + listNode *dln; + listRewind(diffs, &dli); + while((dln = listNext(&dli)) != NULL) { + char *k = dln->value; + clusterManagerLogErr(" - %s\n", k); + } + clusterManagerLogErr("Please fix the above key(s) " + "manually "); + if (do_fix) + clusterManagerLogErr("and try again!\n"); + else { + clusterManagerLogErr("or relaunch the command " + "with --cluster-replace " + "option to force key " + "overriding.\n"); + } + listRelease(diffs); + goto next; + } + listRelease(diffs); + clusterManagerLogWarn("*** Replacing target keys...\n"); } freeReplyObject(migrate_reply); migrate_reply = clusterManagerMigrateKeysInReply(source, From cc29590188a22eb73cfbbef39fce73c7467b1edf Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 18 Dec 2018 18:39:21 +0100 Subject: [PATCH 30/31] Fixed memory leak in clusterManagerCompareKeysValues. --- src/redis-cli.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/redis-cli.c b/src/redis-cli.c index b0a12ebb9..705c7483f 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2993,6 +2993,8 @@ static int clusterManagerCompareKeysValues(clusterManagerNode *n1, cleanup: if (r1) freeReplyObject(r1); if (r2) freeReplyObject(r2); + zfree(argv); + zfree(argv_len); return success; } From 503fd229e4181e932ba74b3ca8a222712d80ebca Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 19 Dec 2018 17:27:58 +0100 Subject: [PATCH 31/31] Cluster Manager: enable --cluster-replace also for 'fix' command. --- src/redis-cli.c | 69 ++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 705c7483f..6fe93e660 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3149,48 +3149,47 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, * - In other cases (ie. reshard), proceed only if the user * launched the command with the --cluster-replace option.*/ if (is_busy) { - clusterManagerLogWarn("\n*** Target key exists, " - "checking values...\n"); - list *diffs = listCreate(); - success = clusterManagerCompareKeysValues(source, - target, reply, diffs); - if (!success && (do_fix || !do_replace)) { - listRelease(diffs); - clusterManagerLogErr("*** Value check failed!\n"); - goto next; - } - if (listLength(diffs) > 0 && (do_fix || !do_replace)) { - success = 0; - clusterManagerLogErr( - "*** Found %d key(s) in both source node and " - "target node having different values.\n" - " Source node: %s:%d\n" - " Target node: %s:%d\n" - " Keys(s):\n", - listLength(diffs), - source->ip, source->port, - target->ip, target->port); - listIter dli; - listNode *dln; - listRewind(diffs, &dli); - while((dln = listNext(&dli)) != NULL) { - char *k = dln->value; - clusterManagerLogErr(" - %s\n", k); + clusterManagerLogWarn("\n*** Target key exists\n"); + if (!do_replace) { + clusterManagerLogWarn("*** Checking key values on " + "both nodes...\n"); + list *diffs = listCreate(); + success = clusterManagerCompareKeysValues(source, + target, reply, diffs); + if (!success) { + clusterManagerLogErr("*** Value check failed!\n"); + listRelease(diffs); + goto next; } - clusterManagerLogErr("Please fix the above key(s) " - "manually "); - if (do_fix) - clusterManagerLogErr("and try again!\n"); - else { - clusterManagerLogErr("or relaunch the command " + if (listLength(diffs) > 0) { + success = 0; + clusterManagerLogErr( + "*** Found %d key(s) in both source node and " + "target node having different values.\n" + " Source node: %s:%d\n" + " Target node: %s:%d\n" + " Keys(s):\n", + listLength(diffs), + source->ip, source->port, + target->ip, target->port); + listIter dli; + listNode *dln; + listRewind(diffs, &dli); + while((dln = listNext(&dli)) != NULL) { + char *k = dln->value; + clusterManagerLogErr(" - %s\n", k); + } + clusterManagerLogErr("Please fix the above key(s) " + "manually and try again " + "or relaunch the command \n" "with --cluster-replace " "option to force key " "overriding.\n"); + listRelease(diffs); + goto next; } listRelease(diffs); - goto next; } - listRelease(diffs); clusterManagerLogWarn("*** Replacing target keys...\n"); } freeReplyObject(migrate_reply);