diff --git a/src/config.c b/src/config.c index 8a2690a0c..9f51bba85 100644 --- a/src/config.c +++ b/src/config.c @@ -1248,7 +1248,7 @@ void configSetCommand(client *c) { if (server.maxmemory < zmalloc_used_memory()) { serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy."); } - freeMemoryIfNeeded(); + freeMemoryIfNeededAndSafe(); } } config_set_memory_field( "proto-max-bulk-len",server.proto_max_bulk_len) { diff --git a/src/db.c b/src/db.c index d7233869f..62c8aa131 100644 --- a/src/db.c +++ b/src/db.c @@ -212,7 +212,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { * 2) clients WATCHing for the destination key notified. * 3) The expire time of the key is reset (the key is made persistent). * - * All the new keys in the database should be creted via this interface. */ + * All the new keys in the database should be created via this interface. */ void setKey(redisDb *db, robj *key, robj *val) { if (lookupKeyWrite(db,key) == NULL) { dbAdd(db,key,val); diff --git a/src/debug.c b/src/debug.c index 3cb567520..1ec7c4977 100644 --- a/src/debug.c +++ b/src/debug.c @@ -74,7 +74,7 @@ void xorDigest(unsigned char *digest, void *ptr, size_t len) { digest[j] ^= hash[j]; } -void xorObjectDigest(unsigned char *digest, robj *o) { +void xorStringObjectDigest(unsigned char *digest, robj *o) { o = getDecodedObject(o); xorDigest(digest,o->ptr,sdslen(o->ptr)); decrRefCount(o); @@ -104,12 +104,151 @@ void mixDigest(unsigned char *digest, void *ptr, size_t len) { SHA1Final(digest,&ctx); } -void mixObjectDigest(unsigned char *digest, robj *o) { +void mixStringObjectDigest(unsigned char *digest, robj *o) { o = getDecodedObject(o); mixDigest(digest,o->ptr,sdslen(o->ptr)); decrRefCount(o); } +/* This function computes the digest of a data structure stored in the + * object 'o'. It is the core of the DEBUG DIGEST command: when taking the + * digest of a whole dataset, we take the digest of the key and the value + * pair, and xor all those together. + * + * Note that this function does not reset the initial 'digest' passed, it + * will continue mixing this object digest to anything that was already + * present. */ +void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) { + uint32_t aux = htonl(o->type); + mixDigest(digest,&aux,sizeof(aux)); + long long expiretime = getExpire(db,keyobj); + char buf[128]; + + /* Save the key and associated value */ + if (o->type == OBJ_STRING) { + mixStringObjectDigest(digest,o); + } else if (o->type == OBJ_LIST) { + listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + robj *eleobj = listTypeGet(&entry); + mixStringObjectDigest(digest,eleobj); + decrRefCount(eleobj); + } + listTypeReleaseIterator(li); + } else if (o->type == OBJ_SET) { + setTypeIterator *si = setTypeInitIterator(o); + sds sdsele; + while((sdsele = setTypeNextObject(si)) != NULL) { + xorDigest(digest,sdsele,sdslen(sdsele)); + sdsfree(sdsele); + } + setTypeReleaseIterator(si); + } else if (o->type == OBJ_ZSET) { + unsigned char eledigest[20]; + + if (o->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *zl = o->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vll; + double score; + + eptr = ziplistIndex(zl,0); + serverAssert(eptr != NULL); + sptr = ziplistNext(zl,eptr); + serverAssert(sptr != NULL); + + while (eptr != NULL) { + serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll)); + score = zzlGetScore(sptr); + + memset(eledigest,0,20); + if (vstr != NULL) { + mixDigest(eledigest,vstr,vlen); + } else { + ll2string(buf,sizeof(buf),vll); + mixDigest(eledigest,buf,strlen(buf)); + } + + snprintf(buf,sizeof(buf),"%.17g",score); + mixDigest(eledigest,buf,strlen(buf)); + xorDigest(digest,eledigest,20); + zzlNext(zl,&eptr,&sptr); + } + } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = o->ptr; + dictIterator *di = dictGetIterator(zs->dict); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + sds sdsele = dictGetKey(de); + double *score = dictGetVal(de); + + snprintf(buf,sizeof(buf),"%.17g",*score); + memset(eledigest,0,20); + mixDigest(eledigest,sdsele,sdslen(sdsele)); + mixDigest(eledigest,buf,strlen(buf)); + xorDigest(digest,eledigest,20); + } + dictReleaseIterator(di); + } else { + serverPanic("Unknown sorted set encoding"); + } + } else if (o->type == OBJ_HASH) { + hashTypeIterator *hi = hashTypeInitIterator(o); + while (hashTypeNext(hi) != C_ERR) { + unsigned char eledigest[20]; + sds sdsele; + + memset(eledigest,0,20); + sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY); + mixDigest(eledigest,sdsele,sdslen(sdsele)); + sdsfree(sdsele); + sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE); + mixDigest(eledigest,sdsele,sdslen(sdsele)); + sdsfree(sdsele); + xorDigest(digest,eledigest,20); + } + hashTypeReleaseIterator(hi); + } else if (o->type == OBJ_STREAM) { + streamIterator si; + streamIteratorStart(&si,o->ptr,NULL,NULL,0); + streamID id; + int64_t numfields; + + while(streamIteratorGetID(&si,&id,&numfields)) { + sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq); + mixDigest(digest,itemid,sdslen(itemid)); + sdsfree(itemid); + + while(numfields--) { + unsigned char *field, *value; + int64_t field_len, value_len; + streamIteratorGetField(&si,&field,&value, + &field_len,&value_len); + mixDigest(digest,field,field_len); + mixDigest(digest,value,value_len); + } + } + streamIteratorStop(&si); + } else if (o->type == OBJ_MODULE) { + RedisModuleDigest md; + moduleValue *mv = o->ptr; + moduleType *mt = mv->type; + moduleInitDigestContext(md); + if (mt->digest) { + mt->digest(&md,mv->value); + xorDigest(digest,md.x,sizeof(md.x)); + } + } else { + serverPanic("Unknown object type"); + } + /* If the key has an expire, add it to the mix */ + if (expiretime != -1) xorDigest(digest,"!!expire!!",10); +} + /* Compute the dataset digest. Since keys, sets elements, hashes elements * are not ordered, we use a trick: every aggregate digest is the xor * of the digests of their elements. This way the order will not change @@ -118,7 +257,6 @@ void mixObjectDigest(unsigned char *digest, robj *o) { * a different digest. */ void computeDatasetDigest(unsigned char *final) { unsigned char digest[20]; - char buf[128]; dictIterator *di = NULL; dictEntry *de; int j; @@ -141,7 +279,6 @@ void computeDatasetDigest(unsigned char *final) { while((de = dictNext(di)) != NULL) { sds key; robj *keyobj, *o; - long long expiretime; memset(digest,0,20); /* This key-val digest */ key = dictGetKey(de); @@ -150,134 +287,8 @@ void computeDatasetDigest(unsigned char *final) { mixDigest(digest,key,sdslen(key)); o = dictGetVal(de); + xorObjectDigest(db,keyobj,digest,o); - aux = htonl(o->type); - mixDigest(digest,&aux,sizeof(aux)); - expiretime = getExpire(db,keyobj); - - /* Save the key and associated value */ - if (o->type == OBJ_STRING) { - mixObjectDigest(digest,o); - } else if (o->type == OBJ_LIST) { - listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL); - listTypeEntry entry; - while(listTypeNext(li,&entry)) { - robj *eleobj = listTypeGet(&entry); - mixObjectDigest(digest,eleobj); - decrRefCount(eleobj); - } - listTypeReleaseIterator(li); - } else if (o->type == OBJ_SET) { - setTypeIterator *si = setTypeInitIterator(o); - sds sdsele; - while((sdsele = setTypeNextObject(si)) != NULL) { - xorDigest(digest,sdsele,sdslen(sdsele)); - sdsfree(sdsele); - } - setTypeReleaseIterator(si); - } else if (o->type == OBJ_ZSET) { - unsigned char eledigest[20]; - - if (o->encoding == OBJ_ENCODING_ZIPLIST) { - unsigned char *zl = o->ptr; - unsigned char *eptr, *sptr; - unsigned char *vstr; - unsigned int vlen; - long long vll; - double score; - - eptr = ziplistIndex(zl,0); - serverAssert(eptr != NULL); - sptr = ziplistNext(zl,eptr); - serverAssert(sptr != NULL); - - while (eptr != NULL) { - serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll)); - score = zzlGetScore(sptr); - - memset(eledigest,0,20); - if (vstr != NULL) { - mixDigest(eledigest,vstr,vlen); - } else { - ll2string(buf,sizeof(buf),vll); - mixDigest(eledigest,buf,strlen(buf)); - } - - snprintf(buf,sizeof(buf),"%.17g",score); - mixDigest(eledigest,buf,strlen(buf)); - xorDigest(digest,eledigest,20); - zzlNext(zl,&eptr,&sptr); - } - } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = o->ptr; - dictIterator *di = dictGetIterator(zs->dict); - dictEntry *de; - - while((de = dictNext(di)) != NULL) { - sds sdsele = dictGetKey(de); - double *score = dictGetVal(de); - - snprintf(buf,sizeof(buf),"%.17g",*score); - memset(eledigest,0,20); - mixDigest(eledigest,sdsele,sdslen(sdsele)); - mixDigest(eledigest,buf,strlen(buf)); - xorDigest(digest,eledigest,20); - } - dictReleaseIterator(di); - } else { - serverPanic("Unknown sorted set encoding"); - } - } else if (o->type == OBJ_HASH) { - hashTypeIterator *hi = hashTypeInitIterator(o); - while (hashTypeNext(hi) != C_ERR) { - unsigned char eledigest[20]; - sds sdsele; - - memset(eledigest,0,20); - sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY); - mixDigest(eledigest,sdsele,sdslen(sdsele)); - sdsfree(sdsele); - sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE); - mixDigest(eledigest,sdsele,sdslen(sdsele)); - sdsfree(sdsele); - xorDigest(digest,eledigest,20); - } - hashTypeReleaseIterator(hi); - } else if (o->type == OBJ_STREAM) { - streamIterator si; - streamIteratorStart(&si,o->ptr,NULL,NULL,0); - streamID id; - int64_t numfields; - - while(streamIteratorGetID(&si,&id,&numfields)) { - sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq); - mixDigest(digest,itemid,sdslen(itemid)); - sdsfree(itemid); - - while(numfields--) { - unsigned char *field, *value; - int64_t field_len, value_len; - streamIteratorGetField(&si,&field,&value, - &field_len,&value_len); - mixDigest(digest,field,field_len); - mixDigest(digest,value,value_len); - } - } - streamIteratorStop(&si); - } else if (o->type == OBJ_MODULE) { - RedisModuleDigest md; - moduleValue *mv = o->ptr; - moduleType *mt = mv->type; - moduleInitDigestContext(md); - if (mt->digest) { - mt->digest(&md,mv->value); - xorDigest(digest,md.x,sizeof(md.x)); - } - } else { - serverPanic("Unknown object type"); - } - /* If the key has an expire, add it to the mix */ - if (expiretime != -1) xorDigest(digest,"!!expire!!",10); /* We can finally xor the key-val digest to the final digest */ xorDigest(final,digest,20); decrRefCount(keyobj); @@ -293,6 +304,7 @@ void debugCommand(client *c) { "CHANGE-REPL-ID -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.", "CRASH-AND-RECOVER -- Hard crash and restart after delay.", "DIGEST -- Output a hex signature representing the current DB content.", +"DIGEST-VALUE ... -- Output a hex signature of the values of all the specified keys.", "ERROR -- Return a Redis protocol error with as message. Useful for clients unit tests to simulate Redis errors.", "LOG -- write message to the server log.", "HTSTATS -- Return hash table statistics of the specified Redis database.", @@ -310,6 +322,7 @@ void debugCommand(client *c) { "SLEEP -- Stop the server for . Decimals allowed.", "STRUCTSIZE -- Return the size of different Redis core C structures.", "ZIPLIST -- Show low level info about the ziplist encoding.", +"STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.", NULL }; addReplyHelp(c, help); @@ -336,7 +349,6 @@ NULL zfree(ptr); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"assert")) { - if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]); serverAssertWithInfo(c,c->argv[0],1 == 2); } else if (!strcasecmp(c->argv[1]->ptr,"log") && c->argc == 3) { serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)c->argv[2]->ptr); @@ -495,15 +507,28 @@ NULL } addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) { + /* DEBUG DIGEST (form without keys specified) */ unsigned char digest[20]; sds d = sdsempty(); - int j; computeDatasetDigest(digest); - for (j = 0; j < 20; j++) - d = sdscatprintf(d, "%02x",digest[j]); + for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]); addReplyStatus(c,d); sdsfree(d); + } else if (!strcasecmp(c->argv[1]->ptr,"digest-value") && c->argc >= 2) { + /* DEBUG DIGEST-VALUE key key key ... key. */ + addReplyMultiBulkLen(c,c->argc-2); + for (int j = 2; j < c->argc; j++) { + unsigned char digest[20]; + memset(digest,0,20); /* Start with a clean result */ + robj *o = lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH); + if (o) xorObjectDigest(c->db,c->argv[j],digest,o); + + sds d = sdsempty(); + for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]); + addReplyStatus(c,d); + sdsfree(d); + } } else if (!strcasecmp(c->argv[1]->ptr,"sleep") && c->argc == 3) { double dtime = strtod(c->argv[2]->ptr,NULL); long long utime = dtime*1000000; @@ -595,6 +620,10 @@ NULL changeReplicationId(); clearReplicationId2(); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"stringmatch-test") && c->argc == 2) + { + stringmatchlen_fuzz_test(); + addReplyStatus(c,"Apparently Redis did not crash: test passed"); } else { addReplySubcommandSyntaxError(c); return; diff --git a/src/evict.c b/src/evict.c index 39deb65a6..773916ce8 100644 --- a/src/evict.c +++ b/src/evict.c @@ -444,8 +444,8 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * Otehrwise if we are over the memory limit, but not enough memory * was freed to return back under the limit, the function returns C_ERR. */ int freeMemoryIfNeeded(void) { - /* By default slaves should ignore maxmemory and just be masters excat - * copies. */ + /* By default replicas should ignore maxmemory + * and just be masters exact copies. */ if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; size_t mem_reported, mem_tofree, mem_freed; @@ -622,3 +622,14 @@ cant_free: return C_ERR; } +/* This is a wrapper for freeMemoryIfNeeded() that only really calls the + * function if right now there are the conditions to do so safely: + * + * - There must be no script in timeout condition. + * - Nor we are loading data right now. + * + */ +int freeMemoryIfNeededAndSafe(void) { + if (server.lua_timedout || server.loading) return C_OK; + return freeMemoryIfNeeded(); +} diff --git a/src/multi.c b/src/multi.c index 8159adcb3..d7f7d4ae8 100644 --- a/src/multi.c +++ b/src/multi.c @@ -35,6 +35,7 @@ void initClientMultiState(client *c) { c->mstate.commands = NULL; c->mstate.count = 0; + c->mstate.cmd_flags = 0; } /* Release all the resources associated with MULTI/EXEC state */ @@ -67,6 +68,7 @@ void queueMultiCommand(client *c) { for (j = 0; j < c->argc; j++) incrRefCount(mc->argv[j]); c->mstate.count++; + c->mstate.cmd_flags |= c->cmd->flags; } void discardTransaction(client *c) { @@ -137,6 +139,21 @@ void execCommand(client *c) { goto handle_monitor; } + /* If there are write commands inside the transaction, and this is a read + * only slave, we want to send an error. This happens when the transaction + * was initiated when the instance was a master or a writable replica and + * then the configuration changed (for example instance was turned into + * a replica). */ + if (!server.loading && server.masterhost && server.repl_slave_ro && + !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE) + { + addReplyError(c, + "Transaction contains write commands but instance " + "is now a read-only replica. EXEC aborted."); + discardTransaction(c); + goto handle_monitor; + } + /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; diff --git a/src/networking.c b/src/networking.c index 7d387dabc..74b857e04 100644 --- a/src/networking.c +++ b/src/networking.c @@ -365,19 +365,13 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { * Where the master must propagate the first change even if the second * will produce an error. However it is useful to log such events since * they are rare and may hint at errors in a script or a bug in Redis. */ - if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) { + if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { char* to = c->flags & CLIENT_MASTER? "master": "replica"; char* from = c->flags & CLIENT_MASTER? "replica": "master"; char *cmdname = c->lastcmd ? c->lastcmd->name : ""; serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " "to its %s: '%s' after processing the command " "'%s'", from, to, s, cmdname); - /* Here we want to panic because when a master is sending an - * error to some slave in the context of replication, this can - * only create some kind of offset or data desynchronization. Better - * to catch it ASAP and crash instead of continuing. */ - if (c->flags & CLIENT_SLAVE) - serverPanic("Continuing is unsafe: replication protocol violation."); } } @@ -1470,7 +1464,7 @@ void processInputBuffer(client *c) { } /* Trim to pos */ - if (c->qb_pos) { + if (server.current_client != NULL && c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } diff --git a/src/redis-cli.c b/src/redis-cli.c index f307d31cf..6fe93e660 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -67,6 +67,7 @@ #define REDIS_CLI_HISTFILE_DEFAULT ".rediscli_history" #define REDIS_CLI_RCFILE_ENV "REDISCLI_RCFILE" #define REDIS_CLI_RCFILE_DEFAULT ".redisclirc" +#define REDIS_CLI_AUTH_ENV "REDISCLI_AUTH" #define CLUSTER_MANAGER_SLOTS 16384 #define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000 @@ -116,6 +117,7 @@ #define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6 #define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7 #define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8 +#define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9 #define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0 #define CLUSTER_MANAGER_OPT_COLD 1 << 1 @@ -1377,6 +1379,9 @@ static int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) { config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; + } else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) { + config.cluster_manager_command.flags |= + CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) { sds version = cliVersion(); printf("redis-cli %s\n", version); @@ -1419,6 +1424,14 @@ static int parseOptions(int argc, char **argv) { return i; } +static void parseEnv() { + /* Set auth from env, but do not overwrite CLI arguments if passed */ + char *auth = getenv(REDIS_CLI_AUTH_ENV); + if (auth != NULL && config.auth == NULL) { + config.auth = auth; + } +} + static sds readArgFromStdin(void) { char buf[1024]; sds arg = sdsempty(); @@ -1446,6 +1459,9 @@ static void usage(void) { " -p Server port (default: 6379).\n" " -s Server socket (overrides hostname and port).\n" " -a Password to use when connecting to the server.\n" +" You can also use the " REDIS_CLI_AUTH_ENV " environment\n" +" variable to pass this password more safely\n" +" (if both are used, this argument takes predecence).\n" " -u Server URI.\n" " -r Execute specified command N times.\n" " -i When -r is used, waits seconds per command.\n" @@ -1834,7 +1850,7 @@ static int evalMode(int argc, char **argv) { if (eval_ldb) { if (!config.eval_ldb) { /* If the debugging session ended immediately, there was an - * error compiling the script. Show it and don't enter + * error compiling the script. Show it and they don't enter * the REPL at all. */ printf("Eval debugging session can't start:\n"); cliReadReply(0); @@ -1917,6 +1933,8 @@ static dictType clusterManagerDictType = { }; typedef int clusterManagerCommandProc(int argc, char **argv); +typedef int (*clusterManagerOnReplyError)(redisReply *reply, + clusterManagerNode *n, int bulk_idx); /* Cluster Manager helper functions */ @@ -1978,14 +1996,17 @@ typedef struct clusterManagerCommandDef { clusterManagerCommandDef clusterManagerCommands[] = { {"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN", "replicas "}, - {"check", clusterManagerCommandCheck, -1, "host:port", NULL}, + {"check", clusterManagerCommandCheck, -1, "host:port", + "search-multiple-owners"}, {"info", clusterManagerCommandInfo, -1, "host:port", NULL}, - {"fix", clusterManagerCommandFix, -1, "host:port", NULL}, + {"fix", clusterManagerCommandFix, -1, "host:port", + "search-multiple-owners"}, {"reshard", clusterManagerCommandReshard, -1, "host:port", - "from ,to ,slots ,yes,timeout ,pipeline "}, + "from ,to ,slots ,yes,timeout ,pipeline ," + "replace"}, {"rebalance", clusterManagerCommandRebalance, -1, "host:port", "weight ,use-empty-masters," - "timeout ,simulate,pipeline ,threshold "}, + "timeout ,simulate,pipeline ,threshold ,replace"}, {"add-node", clusterManagerCommandAddNode, 2, "new_host:new_port existing_host:existing_port", "slave,master-id "}, {"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL}, @@ -2176,6 +2197,44 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n, return 1; } +/* Call MULTI command on a cluster node. */ +static int clusterManagerStartTransaction(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +/* Call EXEC command on a cluster node. */ +static int clusterManagerExecTransaction(clusterManagerNode *node, + clusterManagerOnReplyError onerror) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success) { + if (reply->type != REDIS_REPLY_ARRAY) { + success = 0; + goto cleanup; + } + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + char *err = NULL; + success = clusterManagerCheckRedisReply(node, r, &err); + if (!success && onerror) success = onerror(r, node, i); + if (err) { + if (!success) + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); + zfree(err); + } + if (!success) break; + } + } +cleanup: + if (reply) freeReplyObject(reply); + return success; +} + static int clusterManagerNodeConnect(clusterManagerNode *node) { if (node->context) redisFree(node->context); node->context = redisConnect(node->ip, node->port); @@ -2710,6 +2769,55 @@ cleanup: return success; } +/* Get the node the slot is assigned to from the point of view of node *n. + * If the slot is unassigned or if the reply is an error, return NULL. + * Use the **err argument in order to check wether the slot is unassigned + * or the reply resulted in an error. */ +static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n, + int slot, char **err) +{ + assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS); + clusterManagerNode *owner = NULL; + redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS"); + if (clusterManagerCheckRedisReply(n, reply, err)) { + assert(reply->type == REDIS_REPLY_ARRAY); + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3); + int from, to; + from = r->element[0]->integer; + to = r->element[1]->integer; + if (slot < from || slot > to) continue; + redisReply *nr = r->element[2]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2); + char *name = NULL; + if (nr->elements >= 3) + name = nr->element[2]->str; + if (name != NULL) + owner = clusterManagerNodeByName(name); + else { + char *ip = nr->element[0]->str; + assert(ip != NULL); + int port = (int) nr->element[1]->integer; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *nd = ln->value; + if (strcmp(nd->ip, ip) == 0 && port == nd->port) { + owner = nd; + break; + } + } + } + if (owner) break; + } + } + if (reply) freeReplyObject(reply); + return owner; +} + /* Set slot status to "importing" or "migrating" */ static int clusterManagerSetSlot(clusterManagerNode *node1, clusterManagerNode *node2, @@ -2734,6 +2842,162 @@ cleanup: return success; } +static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER SETSLOT %d %s", slot, "STABLE"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerDelSlot(clusterManagerNode *node, int slot, + int ignore_unassigned_err) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER DELSLOTS %d", slot); + char *err = NULL; + int success = clusterManagerCheckRedisReply(node, reply, &err); + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + ignore_unassigned_err) + { + char *get_owner_err = NULL; + clusterManagerNode *assigned_to = + clusterManagerGetSlotOwner(node, slot, &get_owner_err); + if (!assigned_to) { + if (get_owner_err == NULL) success = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err); + zfree(get_owner_err); + } + } + } + if (!success && err != NULL) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); + zfree(err); + } + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerAddSlot(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER ADDSLOTS %d", slot); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node, + int slot) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER COUNTKEYSINSLOT %d", slot); + int count = -1; + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer; + if (reply) freeReplyObject(reply); + return count; +} + +static int clusterManagerBumpEpoch(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore + * errors except for ADDSLOTS errors. + * Return 1 if the error should be ignored. */ +static int clusterManagerOnSetOwnerErr(redisReply *reply, + clusterManagerNode *n, int bulk_idx) +{ + UNUSED(reply); + UNUSED(n); + /* Only raise error when ADDSLOTS fail (bulk_idx == 1). */ + return (bulk_idx != 1); +} + +static int clusterManagerSetSlotOwner(clusterManagerNode *owner, + int slot, + int do_clear) +{ + int success = clusterManagerStartTransaction(owner); + if (!success) return 0; + /* Ensure the slot is not already assigned. */ + clusterManagerDelSlot(owner, slot, 1); + /* Add the slot and bump epoch. */ + clusterManagerAddSlot(owner, slot); + if (do_clear) clusterManagerClearSlotStatus(owner, slot); + clusterManagerBumpEpoch(owner); + success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr); + return success; +} + +/* Get the hash for the values of the specified keys in *keys_reply for the + * specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command + * on both nodes. Every key with same name on both nodes but having different + * values will be added to the *diffs list. Return 0 in case of reply + * error. */ +static int clusterManagerCompareKeysValues(clusterManagerNode *n1, + clusterManagerNode *n2, + redisReply *keys_reply, + list *diffs) +{ + size_t i, argc = keys_reply->elements + 2; + static const char *hash_zero = "0000000000000000000000000000000000000000"; + char **argv = zcalloc(argc * sizeof(char *)); + size_t *argv_len = zcalloc(argc * sizeof(size_t)); + argv[0] = "DEBUG"; + argv_len[0] = 5; + argv[1] = "DIGEST-VALUE"; + argv_len[1] = 12; + for (i = 0; i < keys_reply->elements; i++) { + redisReply *entry = keys_reply->element[i]; + int idx = i + 2; + argv[idx] = entry->str; + argv_len[idx] = entry->len; + } + int success = 0; + void *_reply1 = NULL, *_reply2 = NULL; + redisReply *r1 = NULL, *r2 = NULL; + redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n1->context, &_reply1) == REDIS_OK); + if (!success) goto cleanup; + r1 = (redisReply *) _reply1; + redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n2->context, &_reply2) == REDIS_OK); + if (!success) goto cleanup; + r2 = (redisReply *) _reply2; + success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR); + if (r1->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str); + success = 0; + } + if (r2->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str); + success = 0; + } + if (!success) goto cleanup; + assert(keys_reply->elements == r1->elements && + keys_reply->elements == r2->elements); + for (i = 0; i < keys_reply->elements; i++) { + char *key = keys_reply->element[i]->str; + char *hash1 = r1->element[i]->str; + char *hash2 = r2->element[i]->str; + /* Ignore keys that don't exist in both nodes. */ + if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0) + continue; + if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key); + } +cleanup: + if (r1) freeReplyObject(r1); + if (r2) freeReplyObject(r2); + zfree(argv); + zfree(argv_len); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -2814,8 +3078,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, char **err) { int success = 1; - int do_fix = (config.cluster_manager_command.flags & - CLUSTER_MANAGER_CMD_FLAG_FIX); + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; + int do_replace = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_REPLACE; while (1) { char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; @@ -2846,16 +3112,86 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, dots); if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { - if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) { - /* If the key already exists, try to migrate keys - * adding REPLACE option. - * If the key's slot is not served, try to assign slot + int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; + int not_served = 0; + if (!is_busy) { + /* Check if the slot is unassigned (not served) in the + * source node's configuration. */ + char *get_owner_err = NULL; + clusterManagerNode *served_by = + clusterManagerGetSlotOwner(source, slot, &get_owner_err); + if (!served_by) { + if (get_owner_err == NULL) not_served = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, + get_owner_err); + zfree(get_owner_err); + } + } + } + /* Try to handle errors. */ + if (is_busy || not_served) { + /* If the key's slot is not served, try to assign slot * to the target node. */ - int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL); - if (strstr(migrate_reply->str, "slot not served") != NULL) + if (do_fix && not_served) { + clusterManagerLogWarn("*** Slot was not served, setting " + "owner to node %s:%d.\n", + target->ip, target->port); clusterManagerSetSlot(source, target, slot, "node", NULL); - clusterManagerLogWarn("*** Target key exists. " - "Replacing it for FIX.\n"); + } + /* If the key already exists in the target node (BUSYKEY), + * check whether its value is the same in both nodes. + * In case of equal values, retry migration with the + * REPLACE option. + * In case of different values: + * - If the migration is requested by the fix command, stop + * and warn the user. + * - In other cases (ie. reshard), proceed only if the user + * launched the command with the --cluster-replace option.*/ + if (is_busy) { + clusterManagerLogWarn("\n*** Target key exists\n"); + if (!do_replace) { + clusterManagerLogWarn("*** Checking key values on " + "both nodes...\n"); + list *diffs = listCreate(); + success = clusterManagerCompareKeysValues(source, + target, reply, diffs); + if (!success) { + clusterManagerLogErr("*** Value check failed!\n"); + listRelease(diffs); + goto next; + } + if (listLength(diffs) > 0) { + success = 0; + clusterManagerLogErr( + "*** Found %d key(s) in both source node and " + "target node having different values.\n" + " Source node: %s:%d\n" + " Target node: %s:%d\n" + " Keys(s):\n", + listLength(diffs), + source->ip, source->port, + target->ip, target->port); + listIter dli; + listNode *dln; + listRewind(diffs, &dli); + while((dln = listNext(&dli)) != NULL) { + char *k = dln->value; + clusterManagerLogErr(" - %s\n", k); + } + clusterManagerLogErr("Please fix the above key(s) " + "manually and try again " + "or relaunch the command \n" + "with --cluster-replace " + "option to force key " + "overriding.\n"); + listRelease(diffs); + goto next; + } + listRelease(diffs); + } + clusterManagerLogWarn("*** Replacing target keys...\n"); + } freeReplyObject(migrate_reply); migrate_reply = clusterManagerMigrateKeysInReply(source, target, @@ -3610,24 +3946,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(none, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ - n->slots[atoi(slot)] = 1; + n->slots[s] = 1; fixed++; } } @@ -3635,7 +3964,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { /* Handle case "2": keys only in one node. */ if (listLength(single) > 0) { - printf("The following uncovered slots have keys in just one node:\n"); + printf("The following uncovered slots have keys in just one node:\n"); clusterManagerPrintSlotsList(single); if (confirmWithYes("Fix these slots by covering with those nodes?")){ listIter li; @@ -3643,6 +3972,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(single, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot); assert(entry != NULL); list *nodes = (list *) dictGetVal(entry); @@ -3651,18 +3981,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = fn->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ n->slots[atoi(slot)] = 1; @@ -3695,23 +4017,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s moving keys " "to %s:%d\n", slot, target->ip, target->port); - /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - if (fixed < 0) goto cleanup; - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER SETSLOT %s %s", slot, "STABLE"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(target, s, 1)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ target->slots[atoi(slot)] = 1; @@ -3722,23 +4031,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *src = nln->value; if (src == target) continue; /* Assign the slot to target node in the source node. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "NODE", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerSetSlot(src, target, s, "NODE", NULL)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Set the source node in 'importing' state * (even if we will actually migrate keys away) * in order to avoid receiving redirections * for MIGRATE. */ - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "IMPORTING", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) - fixed = -1; - if (r) freeReplyObject(r); + if (!clusterManagerSetSlot(src, target, s, + "IMPORTING", NULL)) fixed = -1; if (fixed < 0) goto cleanup; int opts = CLUSTER_MANAGER_OPT_VERBOSE | CLUSTER_MANAGER_OPT_COLD; @@ -3746,12 +4047,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { fixed = -1; goto cleanup; } - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s", slot, - "STABLE"); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerClearSlotStatus(src, s)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; } fixed++; @@ -3875,24 +4172,9 @@ static int clusterManagerFixOpenSlot(int slot) { // Use ADDSLOTS to assign the slot. clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n", owner->ip, owner->port); - redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER " - "SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerClearSlotStatus(owner, slot); if (!success) goto cleanup; - /* Ensure that the slot is unassigned before assigning it to the - * owner. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); - if (!success) goto cleanup; - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerSetSlotOwner(owner, slot, 0); if (!success) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3900,9 +4182,7 @@ static int clusterManagerFixOpenSlot(int slot) { /* Make sure this information will propagate. Not strictly needed * since there is no past owner, so all the other nodes will accept * whatever epoch this node will claim the slot with. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerBumpEpoch(owner); if (!success) goto cleanup; /* Remove the owner from the list of migrating/importing * nodes. */ @@ -3922,16 +4202,10 @@ static int clusterManagerFixOpenSlot(int slot) { * the owner has been set in the previous condition (owner == NULL). */ assert(owner != NULL); listRewind(owners, &li); - redisReply *reply = NULL; while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n == owner) continue; - reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(n, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); + success = clusterManagerDelSlot(n, slot, 1); if (!success) goto cleanup; n->slots[slot] = 0; /* Assign the slot to the owner in the node 'n' configuration.' */ @@ -3955,6 +4229,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogInfo(">>> Case 1: Moving slot %d from " "%s:%d to %s:%d\n", slot, src->ip, src->port, dst->ip, dst->port); + move_opts |= CLUSTER_MANAGER_OPT_UPDATE; success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); } /* Case 2: There are multiple nodes that claim the slot as importing, @@ -3973,11 +4248,7 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; clusterManagerLogInfo(">>> Setting %d as STABLE in " "%s:%d\n", slot, n->ip, n->port); - - redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerClearSlotStatus(n, slot); if (!success) goto cleanup; } /* Since the slot has been moved in "cold" mode, ensure that all the @@ -3987,12 +4258,76 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerNode *n = ln->value; if (n == owner) continue; if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL); if (!success) goto cleanup; } + } + /* Case 3: The slot is in migrating state in one node but multiple + * other nodes claim to be in importing state and don't have any key in + * the slot. We search for the importing node having the same ID as + * the destination node of the migrating node. + * In that case we move the slot from the migrating node to this node and + * we close the importing states on all the other importing nodes. + * If no importing node has the same ID as the destination node of the + * migrating node, the slot's state is closed on both the migrating node + * and the importing nodes. */ + else if (listLength(migrating) == 1 && listLength(importing) > 1) { + int try_to_fix = 1; + clusterManagerNode *src = listFirst(migrating)->value; + clusterManagerNode *dst = NULL; + sds target_id = NULL; + for (int i = 0; i < src->migrating_count; i += 2) { + sds migrating_slot = src->migrating[i]; + if (atoi(migrating_slot) == slot) { + target_id = src->migrating[i + 1]; + break; + } + } + assert(target_id != NULL); + listIter li; + listNode *ln; + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + int count = clusterManagerCountKeysInSlot(n, slot); + if (count > 0) { + try_to_fix = 0; + break; + } + if (strcmp(n->name, target_id) == 0) dst = n; + } + if (!try_to_fix) goto unhandled_case; + if (dst != NULL) { + clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to " + "%s:%d and closing it on all the other " + "importing nodes.\n", + slot, src->ip, src->port, + dst->ip, dst->port); + /* Move the slot to the destination node. */ + success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); + if (!success) goto cleanup; + /* Close slot on all the other importing nodes. */ + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (dst == n) continue; + success = clusterManagerClearSlotStatus(n, slot); + if (!success) goto cleanup; + } + } else { + clusterManagerLogInfo(">>> Case 3: Closing slot %d on both " + "migrating and importing nodes.\n", slot); + /* Close the slot on both the migrating node and the importing + * nodes. */ + success = clusterManagerClearSlotStatus(src, slot); + if (!success) goto cleanup; + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + success = clusterManagerClearSlotStatus(n, slot); + if (!success) goto cleanup; + } + } } else { int try_to_close_slot = (listLength(importing) == 0 && listLength(migrating) == 1); @@ -4009,13 +4344,13 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; } } - /* Case 3: There are no slots claiming to be in importing state, but - * there is a migrating node that actually don't have any key or is the - * slot owner. We can just close the slot, probably a reshard interrupted - * in the middle. */ + /* Case 4: There are no slots claiming to be in importing state, but + * there is a migrating node that actually don't have any key or is the + * slot owner. We can just close the slot, probably a reshard + * interrupted in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; - clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n", + clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n", slot, n->ip, n->port); redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", slot, "STABLE"); @@ -4023,6 +4358,7 @@ static int clusterManagerFixOpenSlot(int slot) { if (r) freeReplyObject(r); if (!success) goto cleanup; } else { +unhandled_case: success = 0; clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot " "yet (work in progress). Slot is set as " @@ -4040,17 +4376,55 @@ cleanup: return success; } +static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { + clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot); + int success = 0; + assert(listLength(owners) > 1); + clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners, + slot, + NULL); + if (!owner) owner = listFirst(owners)->value; + clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n", + slot, owner->ip, owner->port); + /* Set the slot owner. */ + if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + /* Update configuration in all the other master nodes by assigning the slot + * itself to the new owner, and by eventually migrating keys if the node + * has keys for the slot. */ + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n == owner) continue; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + int count = clusterManagerCountKeysInSlot(n, slot); + success = (count >= 0); + if (!success) break; + clusterManagerDelSlot(n, slot, 1); + if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0; + if (count > 0) { + int opts = CLUSTER_MANAGER_OPT_VERBOSE | + CLUSTER_MANAGER_OPT_COLD; + success = clusterManagerMoveSlot(n, owner, slot, opts, NULL); + if (!success) break; + } + } + return success; +} + static int clusterManagerCheckCluster(int quiet) { listNode *ln = listFirst(cluster_manager.nodes); if (!ln) return 0; - int result = 1; - int do_fix = config.cluster_manager_command.flags & - CLUSTER_MANAGER_CMD_FLAG_FIX; clusterManagerNode *node = ln->value; clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n", node->ip, node->port); + int result = 1, consistent = 0; + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; if (!quiet) clusterManagerShowNodes(); - if (!clusterManagerIsConfigConsistent()) { + consistent = clusterManagerIsConfigConsistent(); + if (!consistent) { sds err = sdsnew("[ERR] Nodes don't agree about configuration!"); clusterManagerOnError(err); result = 0; @@ -4058,7 +4432,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogOk("[OK] All nodes agree about slots " "configuration.\n"); } - // Check open slots + /* Check open slots */ clusterManagerLogInfo(">>> Check for open slots...\n"); listIter li; listRewind(cluster_manager.nodes, &li); @@ -4077,7 +4451,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->migrating_count; i += 2) { sds slot = n->migrating[i]; - dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4095,7 +4469,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->importing_count; i += 2) { sds slot = n->importing[i]; - dictAdd(open_slots, slot, sdsdup(n->importing[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->importing[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4117,7 +4491,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogErr("%s.\n", (char *) errstr); sdsfree(errstr); if (do_fix) { - // Fix open slots. + /* Fix open slots. */ dictReleaseIterator(iter); iter = dictGetIterator(open_slots); while ((entry = dictNext(iter)) != NULL) { @@ -4152,6 +4526,54 @@ static int clusterManagerCheckCluster(int quiet) { if (fixed > 0) result = 1; } } + int search_multiple_owners = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; + if (search_multiple_owners) { + /* Check whether there are multiple owners, even when slots are + * fully covered and there are no open slots. */ + clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); + int slot = 0, slots_with_multiple_owners = 0; + for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + list *owners = listCreate(); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + if (n->slots[slot]) listAddNodeTail(owners, n); + else { + /* Nodes having keys for the slot will be considered + * owners too. */ + int count = clusterManagerCountKeysInSlot(n, slot); + if (count > 0) listAddNodeTail(owners, n); + } + } + if (listLength(owners) > 1) { + result = 0; + clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n", + slot, listLength(owners)); + listRewind(owners, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + clusterManagerLogErr(" %s:%d\n", n->ip, n->port); + } + slots_with_multiple_owners++; + if (do_fix) { + result = clusterManagerFixMultipleSlotOwners(slot, owners); + if (!result) { + clusterManagerLogErr("Failed to fix multiple owners " + "for slot %d\n", slot); + listRelease(owners); + break; + } else slots_with_multiple_owners--; + } + } + listRelease(owners); + } + if (slots_with_multiple_owners == 0) + clusterManagerLogOk("[OK] No multiple owners found.\n"); + } return result; } @@ -5404,7 +5826,7 @@ static int clusterManagerCommandCall(int argc, char **argv) { if (status != REDIS_OK || reply == NULL ) printf("%s:%d: Failed!\n", n->ip, n->port); else { - sds formatted_reply = cliFormatReplyTTY(reply, ""); + sds formatted_reply = cliFormatReplyRaw(reply); printf("%s:%d: %s\n", n->ip, n->port, (char *) formatted_reply); sdsfree(formatted_reply); } @@ -6781,6 +7203,8 @@ int main(int argc, char **argv) { argc -= firstarg; argv += firstarg; + parseEnv(); + /* Cluster Manager mode */ if (CLUSTER_MANAGER_MODE()) { clusterManagerCommandProc *proc = validateClusterManagerCommand(); diff --git a/src/sds.c b/src/sds.c index 39ad595ed..330c955e8 100644 --- a/src/sds.c +++ b/src/sds.c @@ -695,7 +695,7 @@ sds sdscatfmt(sds s, char const *fmt, ...) { * s = sdstrim(s,"Aa. :"); * printf("%s\n", s); * - * Output will be just "Hello World". + * Output will be just "HelloWorld". */ sds sdstrim(sds s, const char *cset) { char *start, *end, *sp, *ep; diff --git a/src/server.c b/src/server.c index cc335ebdc..9371b2bac 100644 --- a/src/server.c +++ b/src/server.c @@ -2607,17 +2607,13 @@ int processCommand(client *c) { } /* Handle the maxmemory directive. - * - * First we try to free some memory if possible (if there are volatile - * keys in the dataset). If there are not the only thing we can do - * is returning an error. * * Note that we do not want to reclaim memory if we are here re-entering * the event loop since there is a busy Lua script running in timeout - * condition, to avoid mixing the propagation of scripts with the propagation - * of DELs due to eviction. */ + * condition, to avoid mixing the propagation of scripts with the + * propagation of DELs due to eviction. */ if (server.maxmemory && !server.lua_timedout) { - int out_of_memory = freeMemoryIfNeeded() == C_ERR; + int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR; @@ -3247,11 +3243,11 @@ sds genRedisInfoString(char *section) { "allocator_frag_ratio:%.2f\r\n" "allocator_frag_bytes:%zu\r\n" "allocator_rss_ratio:%.2f\r\n" - "allocator_rss_bytes:%zu\r\n" + "allocator_rss_bytes:%zd\r\n" "rss_overhead_ratio:%.2f\r\n" - "rss_overhead_bytes:%zu\r\n" + "rss_overhead_bytes:%zd\r\n" "mem_fragmentation_ratio:%.2f\r\n" - "mem_fragmentation_bytes:%zu\r\n" + "mem_fragmentation_bytes:%zd\r\n" "mem_not_counted_for_evict:%zu\r\n" "mem_replication_backlog:%zu\r\n" "mem_clients_slaves:%zu\r\n" diff --git a/src/server.h b/src/server.h index 86d6e2e4b..da4c6d45a 100644 --- a/src/server.h +++ b/src/server.h @@ -654,6 +654,9 @@ typedef struct multiCmd { typedef struct multiState { multiCmd *commands; /* Array of MULTI commands */ int count; /* Total number of MULTI commands */ + int cmd_flags; /* The accumulated command flags OR-ed together. + So if at least a command has a given flag, it + will be set in this field. */ int minreplicas; /* MINREPLICAS for synchronous replication */ time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ } multiState; @@ -864,11 +867,11 @@ struct redisMemOverhead { float dataset_perc; float peak_perc; float total_frag; - size_t total_frag_bytes; + ssize_t total_frag_bytes; float allocator_frag; - size_t allocator_frag_bytes; + ssize_t allocator_frag_bytes; float allocator_rss; - size_t allocator_rss_bytes; + ssize_t allocator_rss_bytes; float rss_extra; size_t rss_extra_bytes; size_t num_dbs; @@ -1699,6 +1702,7 @@ int zslLexValueLteMax(sds value, zlexrangespec *spec); int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level); size_t freeMemoryGetNotCountedMemory(); int freeMemoryIfNeeded(void); +int freeMemoryIfNeededAndSafe(void); int processCommand(client *c); void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); diff --git a/src/util.c b/src/util.c index 430cbe61a..66d599190 100644 --- a/src/util.c +++ b/src/util.c @@ -48,7 +48,7 @@ int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase) { - while(patternLen) { + while(patternLen && stringLen) { switch(pattern[0]) { case '*': while (pattern[1] == '*') { @@ -171,6 +171,22 @@ int stringmatch(const char *pattern, const char *string, int nocase) { return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase); } +/* Fuzz stringmatchlen() trying to crash it with bad input. */ +int stringmatchlen_fuzz_test(void) { + char str[32]; + char pat[32]; + int cycles = 10000000; + int total_matches = 0; + while(cycles--) { + int strlen = rand() % sizeof(str); + int patlen = rand() % sizeof(pat); + for (int j = 0; j < strlen; j++) str[j] = rand() % 128; + for (int j = 0; j < patlen; j++) pat[j] = rand() % 128; + total_matches += stringmatchlen(pat, patlen, str, strlen, 0); + } + return total_matches; +} + /* Convert a string representing an amount of memory into the number of * bytes, so for instance memtoll("1Gb") will return 1073741824 that is * (1024*1024*1024). diff --git a/src/util.h b/src/util.h index cc154d968..b6c01aa59 100644 --- a/src/util.h +++ b/src/util.h @@ -40,6 +40,7 @@ int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase); int stringmatch(const char *p, const char *s, int nocase); +int stringmatchlen_fuzz_test(void); long long memtoll(const char *p, int *err); uint32_t digits10(uint64_t v); uint32_t sdigits10(int64_t v); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index b61dfac18..0e50c20a9 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -275,7 +275,6 @@ start_server {tags {"repl"}} { start_server {} { test "Master stream is correctly processed while the replica has a script in -BUSY state" { set slave [srv 0 client] - puts [srv 0 port] $slave config set lua-time-limit 500 $slave slaveof $master_host $master_port