From 4317e2131f4355046fd44141c8b6515b81dc79c0 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Sun, 10 Dec 2017 17:54:56 +0200 Subject: [PATCH 01/51] Standardizes `MEMORY HELP` subcommand --- src/object.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/object.c b/src/object.c index 43ab6b5f0..52d2d4662 100644 --- a/src/object.c +++ b/src/object.c @@ -1120,7 +1120,18 @@ NULL void memoryCommand(client *c) { robj *o; - if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) { + if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { + + const char *help[] = { +"doctor - Return memory problems reports.", +"malloc-stats -- Return internal statistics report from the memory allocator.", +"purge -- Attempt to purge dirty pages for reclamation by the allocator.", +"stats -- Return information about the memory usage of the server.", +"usage [samples ] -- Return memory in bytes used by and its value. Nested values are sampled up to times (default: 5).", +NULL + }; + addReplyHelp(c, help); + } else if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) { long long samples = OBJ_COMPUTE_SIZE_DEF_SAMPLES; for (int j = 3; j < c->argc; j++) { if (!strcasecmp(c->argv[j]->ptr,"samples") && @@ -1234,19 +1245,7 @@ void memoryCommand(client *c) { addReply(c, shared.ok); /* Nothing to do for other allocators. */ #endif - } else if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) { - addReplyMultiBulkLen(c,5); - addReplyBulkCString(c, -"MEMORY DOCTOR - Outputs memory problems report"); - addReplyBulkCString(c, -"MEMORY USAGE [SAMPLES ] - Estimate memory usage of key"); - addReplyBulkCString(c, -"MEMORY STATS - Show memory usage details"); - addReplyBulkCString(c, -"MEMORY PURGE - Ask the allocator to release memory"); - addReplyBulkCString(c, -"MEMORY MALLOC-STATS - Show allocator internal stats"); } else { - addReplyError(c,"Syntax error. Try MEMORY HELP"); + addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)c->argv[1]->ptr); } } From 7820377d0084ac400345047ab67c0752e98897d8 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 15 Dec 2017 21:19:41 +0200 Subject: [PATCH 02/51] Uppercases subcommands in MEMORY HELP --- src/object.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/object.c b/src/object.c index 52d2d4662..606cdd7e5 100644 --- a/src/object.c +++ b/src/object.c @@ -1123,11 +1123,11 @@ void memoryCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"doctor - Return memory problems reports.", -"malloc-stats -- Return internal statistics report from the memory allocator.", -"purge -- Attempt to purge dirty pages for reclamation by the allocator.", -"stats -- Return information about the memory usage of the server.", -"usage [samples ] -- Return memory in bytes used by and its value. Nested values are sampled up to times (default: 5).", +"DOCTOR - Return memory problems reports.", +"MALLOC-STATS -- Return internal statistics report from the memory allocator.", +"PURGE -- Attempt to purge dirty pages for reclamation by the allocator.", +"STATS -- Return information about the memory usage of the server.", +"USAGE [SAMPLES ] -- Return memory in bytes used by and its value. Nested values are sampled up to times (default: 5).", NULL }; addReplyHelp(c, help); From d49bfc40808652389e3e3c3a0db3667153c3c14f Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 15 Dec 2017 21:21:12 +0200 Subject: [PATCH 03/51] Uppercases subcommands in OBJECT HELP --- src/object.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/object.c b/src/object.c index 606cdd7e5..b979b458a 100644 --- a/src/object.c +++ b/src/object.c @@ -1073,10 +1073,10 @@ void objectCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"encoding -- Return the kind of internal representation used in order to store the value associated with a key.", -"freq -- Return the access frequency index of the key. The returned integer is proportional to the logarithm of the recent access frequency of the key.", -"idletime -- Return the idle time of the key, that is the approximated number of seconds elapsed since the last access to the key.", -"refcount -- Return the number of references of the value associated with the specified key.", +"ENCODING -- Return the kind of internal representation used in order to store the value associated with a key.", +"FREQ -- Return the access frequency index of the key. The returned integer is proportional to the logarithm of the recent access frequency of the key.", +"IDLETIME -- Return the idle time of the key, that is the approximated number of seconds elapsed since the last access to the key.", +"REFCOUNT -- Return the number of references of the value associated with the specified key.", NULL }; addReplyHelp(c, help); From 290a63dc54f9cd2a61681be3849f1d9d481aa060 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Tue, 6 Mar 2018 19:34:44 +0700 Subject: [PATCH 04/51] Don't call sdscmp() with shared.maxstring or shared.minstring --- src/t_zset.c | 14 ++++++-------- tests/unit/type/zset.tcl | 18 +++++++++++++++++- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/t_zset.c b/src/t_zset.c index f7f4c6eb2..50652244b 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -519,12 +519,12 @@ int zslParseLexRangeItem(robj *item, sds *dest, int *ex) { switch(c[0]) { case '+': if (c[1] != '\0') return C_ERR; - *ex = 0; + *ex = 1; *dest = shared.maxstring; return C_OK; case '-': if (c[1] != '\0') return C_ERR; - *ex = 0; + *ex = 1; *dest = shared.minstring; return C_OK; case '(': @@ -597,9 +597,8 @@ int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) { zskiplistNode *x; /* Test for ranges that will always be empty. */ - if (sdscmplex(range->min,range->max) > 1 || - (sdscmp(range->min,range->max) == 0 && - (range->minex || range->maxex))) + int cmp = sdscmplex(range->min,range->max); + if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex))) return 0; x = zsl->tail; if (x == NULL || !zslLexValueGteMin(x->ele,range)) @@ -872,9 +871,8 @@ int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) { unsigned char *p; /* Test for ranges that will always be empty. */ - if (sdscmplex(range->min,range->max) > 1 || - (sdscmp(range->min,range->max) == 0 && - (range->minex || range->maxex))) + int cmp = sdscmplex(range->min,range->max); + if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex))) return 0; p = ziplistIndex(zl,-2); /* Last element. */ diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 564825ae9..d8f3cfa53 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -388,7 +388,7 @@ start_server {tags {"zset"}} { 0 omega} } - test "ZRANGEBYLEX/ZREVRANGEBYLEX/ZCOUNT basics" { + test "ZRANGEBYLEX/ZREVRANGEBYLEX/ZLEXCOUNT basics" { create_default_lex_zset # inclusive range @@ -416,6 +416,22 @@ start_server {tags {"zset"}} { assert_equal {} [r zrevrangebylex zset \[elez \[elex] assert_equal {} [r zrevrangebylex zset (hill (omega] } + + test "ZLEXCOUNT advanced" { + create_default_lex_zset + + assert_equal 9 [r zlexcount zset - +] + assert_equal 0 [r zlexcount zset + -] + assert_equal 0 [r zlexcount zset + \[c] + assert_equal 0 [r zlexcount zset \[c -] + assert_equal 8 [r zlexcount zset \[bar +] + assert_equal 5 [r zlexcount zset \[bar \[foo] + assert_equal 4 [r zlexcount zset \[bar (foo] + assert_equal 4 [r zlexcount zset (bar \[foo] + assert_equal 3 [r zlexcount zset (bar (foo] + assert_equal 5 [r zlexcount zset - (foo] + assert_equal 1 [r zlexcount zset (maxstring +] + } test "ZRANGEBYSLEX with LIMIT" { create_default_lex_zset From 98a64523c451d9f6519342b78a857a4aa729cf58 Mon Sep 17 00:00:00 2001 From: Andrey Bugaevskiy Date: Wed, 19 Sep 2018 19:58:39 +0300 Subject: [PATCH 05/51] Prevent RDB autosave from overwriting full resync results During the full database resync we may still have unsaved changes on the receiving side. This causes a race condition between synced data rename/load and the rename of rdbSave tempfile. --- src/replication.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/replication.c b/src/replication.c index 38b1d1614..72d7b8925 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1814,6 +1814,13 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } } + /* Stop background saving for obsolete database state. */ + server.dirty = 0; + if (server.rdb_child_pid != -1) { + kill(server.rdb_child_pid,SIGUSR1); + rdbRemoveTempFile(server.rdb_child_pid); + } + /* Prepare a suitable temp file for bulk transfer */ while(maxtries--) { snprintf(tmpfile,256, From 466c277b4fedefd7fda42959e3e68177a6de254b Mon Sep 17 00:00:00 2001 From: Andrey Bugaevskiy Date: Thu, 27 Sep 2018 19:38:58 +0300 Subject: [PATCH 06/51] Move child termination to readSyncBulkPayload --- src/replication.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/replication.c b/src/replication.c index 72d7b8925..a40d26fbe 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1245,6 +1245,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (eof_reached) { int aof_is_enabled = server.aof_state != AOF_OFF; + /* Ensure background save doesn't overwrite synced data */ + if (server.rdb_child_pid != -1) { + kill(server.rdb_child_pid,SIGUSR1); + rdbRemoveTempFile(server.rdb_child_pid); + } + if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno)); cancelReplicationHandshake(); @@ -1814,13 +1820,6 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } } - /* Stop background saving for obsolete database state. */ - server.dirty = 0; - if (server.rdb_child_pid != -1) { - kill(server.rdb_child_pid,SIGUSR1); - rdbRemoveTempFile(server.rdb_child_pid); - } - /* Prepare a suitable temp file for bulk transfer */ while(maxtries--) { snprintf(tmpfile,256, From 0b73d0a8d2b3cb36127ffa33ec2d37f403de6944 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Fri, 26 Oct 2018 14:02:09 +0000 Subject: [PATCH 07/51] Fix non Linux build. timezone global is a linux-ism whereas it is a function under BSD. Here a helper to get the timezone value in a more portable manner. --- src/server.c | 2 +- src/util.c | 18 ++++++++++++++++++ src/util.h | 1 + 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/server.c b/src/server.c index d6c6dc3a8..f92a75b5e 100644 --- a/src/server.c +++ b/src/server.c @@ -1526,7 +1526,7 @@ void initServerConfig(void) { server.runid[CONFIG_RUN_ID_SIZE] = '\0'; changeReplicationId(); clearReplicationId2(); - server.timezone = timezone; /* Initialized by tzset(). */ + server.timezone = getTimeZone(); /* Initialized by tzset(). */ server.configfile = NULL; server.executable = NULL; server.config_hz = CONFIG_DEFAULT_HZ; diff --git a/src/util.c b/src/util.c index 3fa6c9244..c2d32df8c 100644 --- a/src/util.c +++ b/src/util.c @@ -652,6 +652,24 @@ sds getAbsolutePath(char *filename) { return abspath; } +/* + * Gets the proper timezone in a more portable fashion + * i.e timezone variables are linux specific. + */ + +unsigned long getTimeZone(void) { +#ifdef __linux__ + return timezone; +#else + struct timeval tv; + struct timezone tz; + + gettimeofday(&tv, &tz); + + return tz.tz_minuteswest * 60UL; +#endif +} + /* Return true if the specified path is just a file basename without any * relative or absolute path. This function just checks that no / or \ * character exists inside the specified path, that's enough in the diff --git a/src/util.h b/src/util.h index 91acde047..cc154d968 100644 --- a/src/util.h +++ b/src/util.h @@ -50,6 +50,7 @@ int string2ld(const char *s, size_t slen, long double *dp); int d2string(char *buf, size_t len, double value); int ld2string(char *buf, size_t len, long double value, int humanfriendly); sds getAbsolutePath(char *filename); +unsigned long getTimeZone(void); int pathIsBaseName(char *path); #ifdef REDIS_TEST From ae3bfe583e2f6c0c0cab02cd6fbbcaee86a3ba5b Mon Sep 17 00:00:00 2001 From: David Carlier Date: Fri, 26 Oct 2018 14:12:47 +0000 Subject: [PATCH 08/51] needs it for the global --- src/util.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/util.c b/src/util.c index c2d32df8c..80b460bc2 100644 --- a/src/util.c +++ b/src/util.c @@ -39,6 +39,7 @@ #include #include #include +#include #include "util.h" #include "sha1.h" From 0c875c7751e3672f20225ab3b20fdb0322a8e4ad Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 30 Oct 2018 13:38:41 +0100 Subject: [PATCH 09/51] asyncCloseClientOnOutputBufferLimitReached(): don't free fake clients. Fake clients are used in special situations and are not linked to the normal clients list, freeing them will always result in Redis crashing in one way or the other. It's not common to send replies to fake clients, but we have one usage in the modules API. When a client is blocked, we associate to the blocked client object (that is safe to manipulate in a thread), a fake client that accumulates replies. So because of this bug there was the problem described in issue #5443. The fix was verified to work with the provided example module. To write a regression is very hard and unlikely to be triggered in the future. --- src/networking.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/networking.c b/src/networking.c index e255e64df..7d387dabc 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2109,6 +2109,7 @@ int checkClientOutputBufferLimits(client *c) { * called from contexts where the client can't be freed safely, i.e. from the * lower level functions pushing data inside the client output buffers. */ void asyncCloseClientOnOutputBufferLimitReached(client *c) { + if (c->fd == -1) return; /* It is unsafe to free fake clients. */ serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return; if (checkClientOutputBufferLimits(c)) { From 6534b3e09ed23324e22a4b57382707f52e22ee81 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Tue, 30 Oct 2018 13:23:43 +0000 Subject: [PATCH 10/51] Fix clang build. Some math functions require c11 standard. --- src/Makefile | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index 912cbc19f..eec172d0e 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,7 +20,11 @@ DEPENDENCY_TARGETS=hiredis linenoise lua NODEPS:=clean distclean # Default settings -STD=-std=c99 -pedantic -DREDIS_STATIC='' +ifneq ($(CC),clang) + STD=-std=c99 -pedantic -DREDIS_STATIC='' +else + STD=-std=c11 -pedantic -DREDIS_STATIC='' +endif WARN=-Wall -W -Wno-missing-field-initializers OPT=$(OPTIMIZATION) From a21d1522c79363d54794016d29a7b8af60c30960 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Tue, 30 Oct 2018 14:34:45 +0000 Subject: [PATCH 11/51] allow flavors --- src/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index eec172d0e..471d4fab4 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua NODEPS:=clean distclean # Default settings -ifneq ($(CC),clang) +ifeq (,$(findstring clang,$(CC))) STD=-std=c99 -pedantic -DREDIS_STATIC='' else STD=-std=c11 -pedantic -DREDIS_STATIC='' From 9d6fbf0e0019e7a2f6e11375bf01f7a02208a735 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Tue, 30 Oct 2018 21:41:49 +0000 Subject: [PATCH 12/51] allow flavors --- src/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index eec172d0e..471d4fab4 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua NODEPS:=clean distclean # Default settings -ifneq ($(CC),clang) +ifeq (,$(findstring clang,$(CC))) STD=-std=c99 -pedantic -DREDIS_STATIC='' else STD=-std=c11 -pedantic -DREDIS_STATIC='' From cf2f5e19d9e3579a37ec82e2953fb1e606ddbc59 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Wed, 31 Oct 2018 09:53:07 +0000 Subject: [PATCH 13/51] tweak form feedback --- src/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index 471d4fab4..321118dbb 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,7 +23,7 @@ NODEPS:=clean distclean ifeq (,$(findstring clang,$(CC))) STD=-std=c99 -pedantic -DREDIS_STATIC='' else - STD=-std=c11 -pedantic -DREDIS_STATIC='' + STD=-std=c99 -Wno-c11-extensions -pedantic -DREDIS_STATIC='' endif WARN=-Wall -W -Wno-missing-field-initializers OPT=$(OPTIMIZATION) From 06a4acb7d36a51206507b88984678d91b5db2645 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 31 Oct 2018 11:46:28 +0100 Subject: [PATCH 14/51] When replica kills a pending RDB save during SYNC, log it. This logs what happens in the context of the fix in PR #5367. --- src/replication.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/replication.c b/src/replication.c index 00a6f8c25..a3110661e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1247,6 +1247,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { /* Ensure background save doesn't overwrite synced data */ if (server.rdb_child_pid != -1) { + serverLog(LL_NOTICE, + "Replica is about to load the RDB file received from the " + "master, but there is a pending RDB child running. " + "Killing process %ld and removing its temp file to avoid " + "any race", + (long) server.rdb_child_pid); kill(server.rdb_child_pid,SIGUSR1); rdbRemoveTempFile(server.rdb_child_pid); } From 666b3437e684ae4ebe382006031d920c70c11275 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 31 Oct 2018 12:37:48 +0100 Subject: [PATCH 15/51] Disable protected mode in Sentinel mode. Sentinel must be exposed, so protected mode is just an issue for users in case Redis was started in Sentinel mode. Related to #3279 and #3329. --- src/sentinel.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentinel.c b/src/sentinel.c index 7b703aa33..f0901eeb6 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -459,6 +459,7 @@ struct redisCommand sentinelcmds[] = { * specific defaults. */ void initSentinelConfig(void) { server.port = REDIS_SENTINEL_PORT; + server.protected_mode = 0; /* Sentinel must be exposed. */ } /* Perform the Sentinel mode initialization. */ From fa675256c127963c74ea68f8bab22ef105bada02 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 31 Oct 2018 12:56:47 +0100 Subject: [PATCH 16/51] Add support for Sentinel authentication. So far it was not possible to setup Sentinel with authentication enabled. This commit introduces this feature: every Sentinel will try to authenticate with other sentinels using the same password it is configured to accept clients with. So for instance if a Sentinel has a "requirepass" configuration statemnet set to "foo", it will use the "foo" password to authenticate with every other Sentinel it connects to. So basically to add the "requirepass" to all the Sentinels configurations is enough in order to make sure that: 1) Clients will require the password to access the Sentinels instances. 2) Each Sentinel will use the same password to connect and authenticate with every other Sentinel in the group. Related to #3279 and #3329. --- src/sentinel.c | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/sentinel.c b/src/sentinel.c index f0901eeb6..adff9d4fa 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -452,7 +452,8 @@ struct redisCommand sentinelcmds[] = { {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}, {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0}, {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0}, - {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0} + {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}, + {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0} }; /* This function overwrites a few normal Redis config default with Sentinel @@ -1942,12 +1943,25 @@ werr: /* Send the AUTH command with the specified master password if needed. * Note that for slaves the password set for the master is used. * + * In case this Sentinel requires a password as well, via the "requirepass" + * configuration directive, we assume we should use the local password in + * order to authenticate when connecting with the other Sentinels as well. + * So basically all the Sentinels share the same password and use it to + * authenticate reciprocally. + * * We don't check at all if the command was successfully transmitted * to the instance as if it fails Sentinel will detect the instance down, * will disconnect and reconnect the link and so forth. */ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { - char *auth_pass = (ri->flags & SRI_MASTER) ? ri->auth_pass : - ri->master->auth_pass; + char *auth_pass = NULL; + + if (ri->flags & SRI_MASTER) { + auth_pass = ri->auth_pass; + } else if (ri->flags & SRI_SLAVE) { + auth_pass = ri->master->auth_pass; + } else if (ri->flags & SRI_SENTINEL) { + if (server.requirepass) auth_pass = server.requirepass; + } if (auth_pass) { if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s", From 48d8b3d8ac8de4db033d8836986065b8a41e01dc Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Wed, 31 Oct 2018 17:33:53 +0200 Subject: [PATCH 17/51] Fix some typos --- src/t_stream.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index e37b6582a..3952ec1ab 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -37,7 +37,7 @@ * mark the entry as deleted, or having the same field as the "master" * entry at the start of the listpack> */ #define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */ -#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */ +#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ void streamFreeCG(streamCG *cg); @@ -165,7 +165,7 @@ int streamCompareID(streamID *a, streamID *b) { * Returns the new entry ID populating the 'added_id' structure. * * If 'use_id' is not NULL, the ID is not auto-generated by the function, - * but instead the passed ID is uesd to add the new entry. In this case + * but instead the passed ID is used to add the new entry. In this case * adding the entry may fail as specified later in this comment. * * The function returns C_OK if the item was added, this is always true @@ -223,13 +223,13 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ * * count and deleted just represent respectively the total number of * entries inside the listpack that are valid, and marked as deleted - * (delted flag in the entry flags set). So the total number of items + * (deleted flag in the entry flags set). So the total number of items * actually inside the listpack (both deleted and not) is count+deleted. * * The real entries will be encoded with an ID that is just the * millisecond and sequence difference compared to the key stored at * the radix tree node containing the listpack (delta encoding), and - * if the fields of the entry are the same as the master enty fields, the + * if the fields of the entry are the same as the master entry fields, the * entry flags will specify this fact and the entry fields and number * of fields will be omitted (see later in the code of this function). * @@ -486,7 +486,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { * } * streamIteratorStop(&myiterator); */ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) { - /* Intialize the iterator and translates the iteration start/stop + /* Initialize the iterator and translates the iteration start/stop * elements into a 128 big big-endian number. */ if (start) { streamEncodeID(si->start_key,start); @@ -564,7 +564,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { si->lp_ele = lpLast(si->lp); } } else if (si->rev) { - /* If we are itereating in the reverse order, and this is not + /* If we are iterating in the reverse order, and this is not * the first entry emitted for this listpack, then we already * emitted the current entry, and have to go back to the previous * one. */ @@ -751,7 +751,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { } /* Stop the stream iterator. The only cleanup we need is to free the rax - * itereator, since the stream iterator itself is supposed to be stack + * iterator, since the stream iterator itself is supposed to be stack * allocated. */ void streamIteratorStop(streamIterator *si) { raxStop(&si->ri); @@ -863,15 +863,15 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna * * The behavior may be modified passing non-zero flags: * - * STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above + * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above * is not performed. * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries, * and return the number of entries emitted as usually. * This is used when the function is just used in order * to emit data and there is some higher level logic. * - * The final argument 'spi' (stream propagatino info pointer) is a structure - * filled with information needed to propagte the command execution to AOF + * The final argument 'spi' (stream propagation info pointer) is a structure + * filled with information needed to propagate the command execution to AOF * and slaves, in the case a consumer group was passed: we need to generate * XCLAIM commands to create the pending list into AOF/slaves in that case. * @@ -1136,7 +1136,7 @@ invalid: } /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to - * 0, to be used when - and + are accetable IDs. */ + * 0, to be used when - and + are acceptable IDs. */ int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { return streamGenericParseIDOrReply(c,o,id,missing_seq,0); } From e039c85bb42d7c9c9bb35a9993191a3fae90c907 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 2 Nov 2018 22:58:16 +0200 Subject: [PATCH 18/51] Adds HELP to LATENCY Signed-off-by: Itamar Haber --- src/latency.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/latency.c b/src/latency.c index d89c48db1..97e6a702e 100644 --- a/src/latency.c +++ b/src/latency.c @@ -562,11 +562,21 @@ sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts) { * * LATENCY HISTORY: return time-latency samples for the specified event. * LATENCY LATEST: return the latest latency for all the events classes. - * LATENCY DOCTOR: returns an human readable analysis of instance latency. + * LATENCY DOCTOR: returns a human readable analysis of instance latency. * LATENCY GRAPH: provide an ASCII graph of the latency of the specified event. * LATENCY RESET: reset data of a specified event or all the data if no event provided. */ void latencyCommand(client *c) { + const char *help[] = { +"DOCTOR -- Returns a human readable latency analysis report.", +"GRAPH -- Returns an ASCII latency graph for the event class.", +"HISTORY -- Returns time-latency samples for the event class.", +"LATEST -- Returns the latest latency samples for all events.", +"RESET [event ...] -- Resets latency data of one or more event classes.", +" (default: reset all data for all event classes)", +"HELP -- Prints this help.", +NULL + }; struct latencyTimeSeries *ts; if (!strcasecmp(c->argv[1]->ptr,"history") && c->argc == 3) { @@ -611,8 +621,10 @@ void latencyCommand(client *c) { resets += latencyResetEvent(c->argv[j]->ptr); addReplyLongLong(c,resets); } + } else if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc >= 2) { + addReplyHelp(c, help); } else { - addReply(c,shared.syntaxerr); + addReplySubcommandSyntaxError(c); } return; From 5fa41e0c84e46bb1fa417d6bfb60d9429734fd5f Mon Sep 17 00:00:00 2001 From: michael-grunder Date: Sat, 3 Nov 2018 15:13:28 -0700 Subject: [PATCH 19/51] Use typedef'd mstime_t instead of time_t This fixes an overflow on 32-bit systems. --- src/t_stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 3952ec1ab..9faee76d4 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2155,7 +2155,7 @@ void xclaimCommand(client *c) { /* If we stopped because some IDs cannot be parsed, perhaps they * are trailing options. */ - time_t now = mstime(); + mstime_t now = mstime(); streamID last_id = {0,0}; int propagate_last_id = 0; for (; j < c->argc; j++) { From eae8d0582638dcc5ffd686e3165faa05424b3f33 Mon Sep 17 00:00:00 2001 From: valentino Date: Mon, 5 Nov 2018 11:13:02 +0200 Subject: [PATCH 20/51] fix short period of server.hz being uninitialized server.hz was uninitialized between initServerConfig and initServer. this can lead to someone (e.g. queued modules) doing createObject, and accessing an uninitialized variable, that can potentially be 0, and lead to a crash. --- src/server.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.c b/src/server.c index f92a75b5e..d03873060 100644 --- a/src/server.c +++ b/src/server.c @@ -1529,7 +1529,7 @@ void initServerConfig(void) { server.timezone = getTimeZone(); /* Initialized by tzset(). */ server.configfile = NULL; server.executable = NULL; - server.config_hz = CONFIG_DEFAULT_HZ; + server.hz = server.config_hz = CONFIG_DEFAULT_HZ; server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ; server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.port = CONFIG_DEFAULT_SERVER_PORT; From e7c579e1fe0fdcdf631e27a5fd17e36f621ddef3 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 5 Nov 2018 13:06:01 +0100 Subject: [PATCH 21/51] Improve streamReplyWithRange() top comment. --- src/t_stream.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index e37b6582a..cb7e25944 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -847,11 +847,15 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna decrRefCount(argv[4]); } -/* Send the specified range to the client 'c'. The range the client will - * receive is between start and end inclusive, if 'count' is non zero, no more - * than 'count' elements are sent. The 'end' pointer can be NULL to mean that - * we want all the elements from 'start' till the end of the stream. If 'rev' - * is non zero, elements are produced in reversed order from end to start. +/* Send the stream items in the specified range to the client 'c'. The range + * the client will receive is between start and end inclusive, if 'count' is + * non zero, no more than 'count' elements are sent. + * + * The 'end' pointer can be NULL to mean that we want all the elements from + * 'start' till the end of the stream. If 'rev' is non zero, elements are + * produced in reversed order from end to start. + * + * The function returns the number of entries emitted. * * If group and consumer are not NULL, the function performs additional work: * 1. It updates the last delivered ID in the group in case we are From 6ba50784b5ce2e4eae74da00536ebbc1f81984ae Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 5 Nov 2018 13:16:00 +0100 Subject: [PATCH 22/51] Fix XCLAIM missing entry bug. This bug had a double effect: 1. Sometimes entries may not be emitted, producing broken protocol where the array length was greater than the emitted entires, blocking the client waiting for more data. 2. Some other time the right entry was claimed, but a wrong entry was returned to the client. This fix should correct both the instances. --- src/t_stream.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 78f89b7a7..73f7fca0e 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2278,8 +2278,9 @@ void xclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, + NULL,NULL,STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) addReply(c,shared.nullbulk); } arraylen++; From ab270a977785df58461efb4a58f127193e387b13 Mon Sep 17 00:00:00 2001 From: artix Date: Mon, 8 Oct 2018 16:21:41 +0200 Subject: [PATCH 23/51] Cluster Manager: fixed 'DELSLOT' subcommand typo. --- src/redis-cli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 57f812b90..2b0aaea6f 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3833,7 +3833,7 @@ static int clusterManagerFixOpenSlot(int slot) { while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n == owner) continue; - reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOT %d", slot); + reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); success = clusterManagerCheckRedisReply(n, reply, NULL); if (reply) freeReplyObject(reply); if (!success) goto cleanup; From be3a9dbb6fede6ab6b9bd6b1d2efae07543df1e8 Mon Sep 17 00:00:00 2001 From: artix Date: Mon, 8 Oct 2018 16:55:56 +0200 Subject: [PATCH 24/51] Cluster Manager: fixed typos in comments. --- src/redis-cli.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 2b0aaea6f..ac021e855 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3775,7 +3775,7 @@ static int clusterManagerFixOpenSlot(int slot) { } printf("Set as migrating in: %s\n", migrating_str); printf("Set as importing in: %s\n", importing_str); - /* If there is no slot owner, set as owner the slot with the biggest + /* If there is no slot owner, set as owner the node with the biggest * number of keys, among the set of migrating / importing nodes. */ if (owner == NULL) { clusterManagerLogInfo(">>> Nobody claims ownership, " @@ -3848,8 +3848,8 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; } int move_opts = CLUSTER_MANAGER_OPT_VERBOSE; - /* Case 1: The slot is in migrating state in one slot, and in - * importing state in 1 slot. That's trivial to address. */ + /* Case 1: The slot is in migrating state in one node, and in + * importing state in 1 node. That's trivial to address. */ if (listLength(migrating) == 1 && listLength(importing) == 1) { clusterManagerNode *src = listFirst(migrating)->value; clusterManagerNode *dst = listFirst(importing)->value; From 2e9859cbfc204b8a76275e0dcc0ff87c10db2792 Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 23 Oct 2018 16:14:45 +0200 Subject: [PATCH 25/51] Cluster Manager: better fix subcommand. --- src/redis-cli.c | 98 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 78 insertions(+), 20 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index ac021e855..e6e20b6fe 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2726,8 +2726,7 @@ static int clusterManagerSetSlot(clusterManagerNode *node1, if (err != NULL) { *err = zmalloc((reply->len + 1) * sizeof(char)); strcpy(*err, reply->str); - CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, err); - } + } else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str); goto cleanup; } cleanup: @@ -2848,14 +2847,21 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) { + /* If the key already exists, try to migrate keys + * adding REPLACE option. + * If the key's slot is not served, try to assign slot + * to the target node. */ + int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL); + if (strstr(migrate_reply->str, "slot not served") != NULL) + clusterManagerSetSlot(source, target, slot, "node", NULL); clusterManagerLogWarn("*** Target key exists. " "Replacing it for FIX.\n"); freeReplyObject(migrate_reply); - /* Try to migrate keys adding REPLACE option. */ migrate_reply = clusterManagerMigrateKeysInReply(source, target, reply, - 1, timeout, + is_busy, + timeout, NULL); success = (migrate_reply != NULL && migrate_reply->type != REDIS_REPLY_ERROR); @@ -3670,14 +3676,22 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { while ((nln = listNext(&nli)) != NULL) { clusterManagerNode *src = nln->value; if (src == target) continue; + /* Assign the slot to target node in the source node. */ + redisReply *r = CLUSTER_MANAGER_COMMAND(src, + "CLUSTER SETSLOT %s %s %s", slot, + "NODE", target->name); + if (!clusterManagerCheckRedisReply(src, r, NULL)) + fixed = -1; + if (r) freeReplyObject(r); + if (fixed < 0) goto cleanup; /* Set the source node in 'importing' state * (even if we will actually migrate keys away) * in order to avoid receiving redirections * for MIGRATE. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(src, + r = CLUSTER_MANAGER_COMMAND(src, "CLUSTER SETSLOT %s %s %s", slot, "IMPORTING", target->name); - if (!clusterManagerCheckRedisReply(target, r, NULL)) + if (!clusterManagerCheckRedisReply(src, r, NULL)) fixed = -1; if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; @@ -3687,6 +3701,13 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { fixed = -1; goto cleanup; } + r = CLUSTER_MANAGER_COMMAND(src, + "CLUSTER SETSLOT %s %s", slot, + "STABLE"); + if (!clusterManagerCheckRedisReply(src, r, NULL)) + fixed = -1; + if (r) freeReplyObject(r); + if (fixed < 0) goto cleanup; } fixed++; } @@ -3707,7 +3728,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogInfo(">>> Fixing open slot %d\n", slot); /* Try to obtain the current slot owner, according to the current * nodes configuration. */ - int success = 1; + int success = 1, keys_in_multiple_nodes = 0; list *owners = listCreate(); list *migrating = listCreate(); list *importing = listCreate(); @@ -3720,11 +3741,23 @@ static int clusterManagerFixOpenSlot(int slot) { while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - if (n->slots[slot]) { - if (owner == NULL) owner = n; - listAddNodeTail(owners, n); + if (n->slots[slot]) listAddNodeTail(owners, n); + else { + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER COUNTKEYSINSLOT %d", slot); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (success && r->integer > 0) { + clusterManagerLogWarn("*** Found keys about slot %d " + "in non-owner node %s:%d!\n", slot, + n->ip, n->port); + listAddNodeTail(owners, n); + keys_in_multiple_nodes = 1; + } + if (r) freeReplyObject(r); + if (!success) goto cleanup; } } + if (listLength(owners) == 1) owner = listFirst(owners)->value; listRewind(cluster_manager.nodes, &li); while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; @@ -3767,6 +3800,9 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogWarn("*** Found keys about slot %d " "in node %s:%d!\n", slot, n->ip, n->port); + char *sep = (listLength(importing) == 0 ? "" : ","); + importing_str = sdscatfmt(importing_str, "%s%S:%u", + sep, n->ip, n->port); listAddNodeTail(importing, n); } if (r) freeReplyObject(r); @@ -3799,6 +3835,15 @@ static int clusterManagerFixOpenSlot(int slot) { success = clusterManagerCheckRedisReply(owner, reply, NULL); if (reply) freeReplyObject(reply); if (!success) goto cleanup; + /* Ensure that the slot is unassigned before assigning it to the + * owner. */ + reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot); + success = clusterManagerCheckRedisReply(owner, reply, NULL); + /* Ignore "already unassigned" error. */ + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + strstr(reply->str, "already unassigned") != NULL) success = 1; + if (reply) freeReplyObject(reply); + if (!success) goto cleanup; reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot); success = clusterManagerCheckRedisReply(owner, reply, NULL); if (reply) freeReplyObject(reply); @@ -3835,12 +3880,22 @@ static int clusterManagerFixOpenSlot(int slot) { if (n == owner) continue; reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); success = clusterManagerCheckRedisReply(n, reply, NULL); + /* Ignore "already unassigned" error. */ + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + strstr(reply->str, "already unassigned") != NULL) success = 1; if (reply) freeReplyObject(reply); if (!success) goto cleanup; + n->slots[slot] = 0; + /* Assign the slot to the owner in the node 'n' configuration.' */ + success = clusterManagerSetSlot(n, owner, slot, "node", NULL); + if (!success) goto cleanup; success = clusterManagerSetSlot(n, owner, slot, "importing", NULL); if (!success) goto cleanup; - clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates + /* Avoid duplicates. */ + clusterManagerRemoveNodeFromList(importing, n); listAddNodeTail(importing, n); + /* Ensure that the node is not in the migrating list. */ + clusterManagerRemoveNodeFromList(migrating, n); } reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); success = clusterManagerCheckRedisReply(owner, reply, NULL); @@ -3883,18 +3938,21 @@ static int clusterManagerFixOpenSlot(int slot) { listLength(migrating) == 1); if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER GETKEYSINSLOT %d %d", slot, 10); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) { - if (success) try_to_close_slot = (r->elements == 0); - freeReplyObject(r); + if (!owner || owner != n) { + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER GETKEYSINSLOT %d %d", slot, 10); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (r) { + if (success) try_to_close_slot = (r->elements == 0); + freeReplyObject(r); + } + if (!success) goto cleanup; } - if (!success) goto cleanup; } /* Case 3: There are no slots claiming to be in importing state, but - * there is a migrating node that actually don't have any key. We - * can just close the slot, probably a reshard interrupted in the middle. */ + * there is a migrating node that actually don't have any key or is the + * slot owner. We can just close the slot, probably a reshard interrupted + * in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", From d6f0a9ac72034d95e6005662eb992eefe161bab2 Mon Sep 17 00:00:00 2001 From: artix Date: Fri, 2 Nov 2018 17:08:25 +0100 Subject: [PATCH 26/51] Cluster Manager: fixed string parsing issue in clusterManagerGetConfigSignature --- src/redis-cli.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index e6e20b6fe..3db212f4a 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3307,8 +3307,8 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { nodename = token; tot_size = (p - token); name_len = tot_size++; // Make room for ':' in tot_size - } else if (i == 8) break; - i++; + } + if (++i == 8) break; } if (i != 8) continue; if (nodename == NULL) continue; @@ -3347,7 +3347,7 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { char *sp = cfg + name_len; *(sp++) = ':'; for (i = 0; i < c; i++) { - if (i > 0) *(sp++) = '|'; + if (i > 0) *(sp++) = ','; int slen = strlen(slots[i]); memcpy(sp, slots[i], slen); sp += slen; From 18ddbf0352cc1a5ae3248f06abed9a0ee59a4edf Mon Sep 17 00:00:00 2001 From: artix Date: Sat, 3 Nov 2018 12:04:59 +0100 Subject: [PATCH 27/51] Cluster Manager: further improvements to "fix": - clusterManagerFixOpenSlot: ensure that the slot is unassigned before ADDSLOTS - clusterManagerFixSlotsCoverage: after cold migration, the slot configuration is now updated on all the nodes. --- src/redis-cli.c | 59 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 3db212f4a..c9e306fa8 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3589,10 +3589,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = node_n->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3620,10 +3627,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = fn->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3657,7 +3671,11 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s moving keys " "to %s:%d\n", slot, target->ip, target->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(target, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; if (r) freeReplyObject(r); @@ -3666,6 +3684,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { "CLUSTER SETSLOT %s %s", slot, "STABLE"); if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3768,7 +3789,7 @@ static int clusterManagerFixOpenSlot(int slot) { sds migrating_slot = n->migrating[i]; if (atoi(migrating_slot) == slot) { char *sep = (listLength(migrating) == 0 ? "" : ","); - migrating_str = sdscatfmt(migrating_str, "%s%S:%u", + migrating_str = sdscatfmt(migrating_str, "%s%s:%u", sep, n->ip, n->port); listAddNodeTail(migrating, n); is_migrating = 1; @@ -3781,7 +3802,7 @@ static int clusterManagerFixOpenSlot(int slot) { sds importing_slot = n->importing[i]; if (atoi(importing_slot) == slot) { char *sep = (listLength(importing) == 0 ? "" : ","); - importing_str = sdscatfmt(importing_str, "%s%S:%u", + importing_str = sdscatfmt(importing_str, "%s%s:%u", sep, n->ip, n->port); listAddNodeTail(importing, n); is_importing = 1; @@ -3809,8 +3830,10 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; } } - printf("Set as migrating in: %s\n", migrating_str); - printf("Set as importing in: %s\n", importing_str); + if (sdslen(migrating_str) > 0) + printf("Set as migrating in: %s\n", migrating_str); + if (sdslen(importing_str) > 0) + printf("Set as importing in: %s\n", importing_str); /* If there is no slot owner, set as owner the node with the biggest * number of keys, among the set of migrating / importing nodes. */ if (owner == NULL) { @@ -3872,7 +3895,9 @@ static int clusterManagerFixOpenSlot(int slot) { * in migrating state, since migrating is a valid state only for * slot owners. */ if (listLength(owners) > 1) { - owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL); + /* Owner cannot be NULL at this point, since if there are more owners, + * the owner has been set in the previous condition (owner == NULL). */ + assert(owner != NULL); listRewind(owners, &li); redisReply *reply = NULL; while ((ln = listNext(&li)) != NULL) { @@ -3897,10 +3922,6 @@ static int clusterManagerFixOpenSlot(int slot) { /* Ensure that the node is not in the migrating list. */ clusterManagerRemoveNodeFromList(migrating, n); } - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); - if (!success) goto cleanup; } int move_opts = CLUSTER_MANAGER_OPT_VERBOSE; /* Case 1: The slot is in migrating state in one node, and in @@ -3908,6 +3929,9 @@ static int clusterManagerFixOpenSlot(int slot) { if (listLength(migrating) == 1 && listLength(importing) == 1) { clusterManagerNode *src = listFirst(migrating)->value; clusterManagerNode *dst = listFirst(importing)->value; + clusterManagerLogInfo(">>> Case 1: Moving slot %d from " + "%s:%d to %s:%d\n", slot, + src->ip, src->port, dst->ip, dst->port); success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); } /* Case 2: There are multiple nodes that claim the slot as importing, @@ -3915,7 +3939,7 @@ static int clusterManagerFixOpenSlot(int slot) { * the slot. In this case we just move all the keys to the owner * according to the configuration. */ else if (listLength(migrating) == 0 && listLength(importing) > 0) { - clusterManagerLogInfo(">>> Moving all the %d slot keys to its " + clusterManagerLogInfo(">>> Case 2: Moving all the %d slot keys to its " "owner %s:%d\n", slot, owner->ip, owner->port); move_opts |= CLUSTER_MANAGER_OPT_COLD; listRewind(importing, &li); @@ -3933,6 +3957,18 @@ static int clusterManagerFixOpenSlot(int slot) { if (r) freeReplyObject(r); if (!success) goto cleanup; } + /* Since the slot has been moved in "cold" mode, ensure that all the + * other nodes update their own configuration about the slot itself. */ + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n == owner) continue; + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (r) freeReplyObject(r); + if (!success) goto cleanup; + } } else { int try_to_close_slot = (listLength(importing) == 0 && listLength(migrating) == 1); @@ -3955,6 +3991,8 @@ static int clusterManagerFixOpenSlot(int slot) { * in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; + clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n", + slot, n->ip, n->port); redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", slot, "STABLE"); success = clusterManagerCheckRedisReply(n, r, NULL); @@ -4083,6 +4121,7 @@ static int clusterManagerCheckCluster(int quiet) { result = 0; if (do_fix/* && result*/) { dictType dtype = clusterManagerDictType; + dtype.keyDestructor = dictSdsDestructor; dtype.valDestructor = dictListDestructor; clusterManagerUncoveredSlots = dictCreate(&dtype, NULL); int fixed = clusterManagerFixSlotsCoverage(slots); From 3a2d82ae8ef4f9b088bf8a2d43fec11ca8639729 Mon Sep 17 00:00:00 2001 From: artix Date: Mon, 5 Nov 2018 14:19:45 +0100 Subject: [PATCH 28/51] Cluster Manager: removed unused var. --- src/redis-cli.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index c9e306fa8..61776c72b 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3749,7 +3749,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogInfo(">>> Fixing open slot %d\n", slot); /* Try to obtain the current slot owner, according to the current * nodes configuration. */ - int success = 1, keys_in_multiple_nodes = 0; + int success = 1; list *owners = listCreate(); list *migrating = listCreate(); list *importing = listCreate(); @@ -3772,7 +3772,6 @@ static int clusterManagerFixOpenSlot(int slot) { "in non-owner node %s:%d!\n", slot, n->ip, n->port); listAddNodeTail(owners, n); - keys_in_multiple_nodes = 1; } if (r) freeReplyObject(r); if (!success) goto cleanup; From 4e74d9cf558918f682adbf2be750998bfca7e950 Mon Sep 17 00:00:00 2001 From: yongman Date: Tue, 6 Nov 2018 10:51:19 +0800 Subject: [PATCH 29/51] fix malloc in clusterManagerComputeReshardTable --- src/redis-cli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 57f812b90..727038f4e 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -4061,7 +4061,7 @@ static clusterManagerNode *clusterNodeForResharding(char *id, static list *clusterManagerComputeReshardTable(list *sources, int numslots) { list *moved = listCreate(); int src_count = listLength(sources), i = 0, tot_slots = 0, j; - clusterManagerNode **sorted = zmalloc(src_count * sizeof(**sorted)); + clusterManagerNode **sorted = zmalloc(src_count * sizeof(*sorted)); listIter li; listNode *ln; listRewind(sources, &li); From 560cdf359f8a600358a7221c469d7085751d054b Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 6 Nov 2018 18:15:51 +0100 Subject: [PATCH 30/51] MEMORY command: make strcasecmp() conditional like the following. --- src/object.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/object.c b/src/object.c index ce2217392..87b32f428 100644 --- a/src/object.c +++ b/src/object.c @@ -1287,8 +1287,7 @@ NULL void memoryCommand(client *c) { robj *o; - if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { - + if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) { const char *help[] = { "DOCTOR - Return memory problems reports.", "MALLOC-STATS -- Return internal statistics report from the memory allocator.", From 0cb798ea2b3ccb163c0add8137276dba31916293 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 7 Nov 2018 12:54:46 +0100 Subject: [PATCH 31/51] Fix cluster-replica-no-failover option name. Thanks to @NicolasLM, see issue #5537. --- src/config.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.c b/src/config.c index d1bfaa3f7..8a2690a0c 100644 --- a/src/config.c +++ b/src/config.c @@ -684,7 +684,7 @@ void loadServerConfigFromString(char *config) { goto loaderr; } } else if ((!strcasecmp(argv[0],"cluster-slave-no-failover") || - !strcasecmp(argv[0],"cluster-replica-no-failiver")) && + !strcasecmp(argv[0],"cluster-replica-no-failover")) && argc == 2) { server.cluster_slave_no_failover = yesnotoi(argv[1]); From 1d9866632709e7d4256e8050e7b4ba81a61bcf71 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Thu, 8 Nov 2018 10:13:52 +0000 Subject: [PATCH 32/51] only FreeBSD change/little warning addressing --- src/Makefile | 9 +++++---- src/debug.c | 2 ++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Makefile b/src/Makefile index 321118dbb..2d279775d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,10 +20,11 @@ DEPENDENCY_TARGETS=hiredis linenoise lua NODEPS:=clean distclean # Default settings -ifeq (,$(findstring clang,$(CC))) - STD=-std=c99 -pedantic -DREDIS_STATIC='' -else - STD=-std=c99 -Wno-c11-extensions -pedantic -DREDIS_STATIC='' +STD=-std=c99 -pedantic -DREDIS_STATIC='' +ifneq (,$(findstring clang,$(CC))) +ifneq (,$(findstring FreeBSD,$(uname_S))) + STD+=-Wno-c11-extensions +endif endif WARN=-Wall -W -Wno-missing-field-initializers OPT=$(OPTIMIZATION) diff --git a/src/debug.c b/src/debug.c index 8cc53d92f..70def3b30 100644 --- a/src/debug.c +++ b/src/debug.c @@ -1189,6 +1189,8 @@ void serverLogHexDump(int level, char *descr, void *value, size_t len) { void watchdogSignalHandler(int sig, siginfo_t *info, void *secret) { #ifdef HAVE_BACKTRACE ucontext_t *uc = (ucontext_t*) secret; +#else + (void)secret; #endif UNUSED(info); UNUSED(sig); From 4e0af5efd42edacda09f6569f063cbfb1fe56fe9 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Sun, 11 Nov 2018 18:49:55 +0000 Subject: [PATCH 33/51] DragonFlyBSD little build fix --- src/Makefile | 5 +++++ src/setproctitle.c | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index 2d279775d..0de0a1c61 100644 --- a/src/Makefile +++ b/src/Makefile @@ -106,6 +106,10 @@ else ifeq ($(uname_S),FreeBSD) # FreeBSD FINAL_LIBS+= -lpthread +else +ifeq ($(uname_S),DragonFly) + # FreeBSD + FINAL_LIBS+= -lpthread else # All the other OSes (notably Linux) FINAL_LDFLAGS+= -rdynamic @@ -115,6 +119,7 @@ endif endif endif endif +endif # Include paths to dependencies FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src diff --git a/src/setproctitle.c b/src/setproctitle.c index 6563242de..5f91d7bfe 100644 --- a/src/setproctitle.c +++ b/src/setproctitle.c @@ -39,7 +39,7 @@ #include /* errno program_invocation_name program_invocation_short_name */ #if !defined(HAVE_SETPROCTITLE) -#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__) +#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__ || defined __DragonFly__) #define HAVE_SETPROCTITLE 1 #else #define HAVE_SETPROCTITLE 0 From d0850369c4a06d6362dbaf12d873e54d6ce931cc Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 11 Nov 2018 09:22:42 +0200 Subject: [PATCH 34/51] fix small test suite race conditions --- tests/support/util.tcl | 8 ++++++++ tests/unit/memefficiency.tcl | 1 + tests/unit/type/stream-cgroups.tcl | 2 ++ 3 files changed, 11 insertions(+) diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 181c865fc..74f491e48 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -91,6 +91,14 @@ proc wait_for_sync r { } } +proc wait_for_ofs_sync {r1 r2} { + wait_for_condition 50 100 { + [status $r1 master_repl_offset] eq [status $r2 master_repl_offset] + } else { + fail "replica didn't sync in time" + } +} + # Random integer between 0 and max (excluded). proc randomInt {max} { expr {int(rand()*$max)} diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 8972d577a..d152e212c 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -90,6 +90,7 @@ start_server {tags {"defrag"}} { test "Active defrag big keys" { r flushdb r config resetstat + r config set save "" ;# prevent bgsave from interfereing with save below r config set activedefrag no r config set active-defrag-max-scan-fields 1000 r config set active-defrag-threshold-lower 5 diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index d2e0d6539..b96bc8e98 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -144,6 +144,8 @@ start_server { } } + wait_for_ofs_sync $master $slave + # Turn slave into master $slave slaveof no one From cabc06076bcdb51d01be0e9596c2bc3241535b94 Mon Sep 17 00:00:00 2001 From: Weiliang Li Date: Thu, 15 Nov 2018 16:55:40 +0800 Subject: [PATCH 35/51] fix comment typo in util.c fix comment typo in util.c --- src/util.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util.c b/src/util.c index 80b460bc2..430cbe61a 100644 --- a/src/util.c +++ b/src/util.c @@ -606,7 +606,7 @@ void getRandomHexChars(char *p, size_t len) { * already, this will be detected and handled correctly. * * The function does not try to normalize everything, but only the obvious - * case of one or more "../" appearning at the start of "filename" + * case of one or more "../" appearing at the start of "filename" * relative path. */ sds getAbsolutePath(char *filename) { char cwd[1024]; From 64324901f22946a93226e0af56a6e59b2290be09 Mon Sep 17 00:00:00 2001 From: yongman Date: Fri, 16 Nov 2018 17:20:53 +0800 Subject: [PATCH 36/51] Fix pointer access and memory leak in redis-cli. --- src/redis-cli.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 1aa6751f7..4f68be200 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2829,7 +2829,7 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (err != NULL) { *err = zmalloc((reply->len + 1) * sizeof(char)); strcpy(*err, reply->str); - CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err); + CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, *err); } goto next; } @@ -2947,7 +2947,7 @@ static int clusterManagerMoveSlot(clusterManagerNode *source, if (err != NULL) { *err = zmalloc((r->len + 1) * sizeof(char)); strcpy(*err, r->str); - CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err); + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, *err); } } freeReplyObject(r); @@ -5196,10 +5196,13 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv) { n->port); ok_count++; continue; -reply_err: +reply_err:; + int need_free = 0; if (err == NULL) err = ""; + else need_free = 1; clusterManagerLogErr("ERR setting node-timeot for %s:%d: %s\n", n->ip, n->port, err); + if (need_free) zfree(err); err_count++; } clusterManagerLogInfo(">>> New node timeout set. %d OK, %d ERR.\n", From 3830ef2483c6adab646f847f96b93f76701f1d43 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 19 Nov 2018 16:26:02 +0100 Subject: [PATCH 37/51] t_stream.c comment resized to 80 cols. --- src/t_stream.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 73f7fca0e..66f3295e0 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1475,7 +1475,8 @@ void xreadCommand(client *c) { streamID *gt = ids+i; /* ID must be greater than this. */ int serve_synchronously = 0; - /* Check if there are the conditions to serve the client synchronously. */ + /* Check if there are the conditions to serve the client + * synchronously. */ if (groups) { /* If the consumer is blocked on a group, we always serve it * synchronously (serving its local history) if the ID specified From 29251f58e279738da6d45c5af3d12f35259f3d45 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 19 Nov 2018 16:41:09 +0100 Subject: [PATCH 38/51] Streams: fix XREADGROUP history reading when CG last_id is low. This fixes the issue reported in #5570. This was fixed the hard way, that is, propagating more information to the lower level API about this being a request to read just the history, so that the code is simpler and less likely to regress. --- src/t_stream.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 66f3295e0..dd83f2b1d 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -894,6 +894,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna #define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array boundaries, just the entries. */ +#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { void *arraylen_ptr = NULL; size_t arraylen = 0; @@ -902,15 +903,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamID id; int propagate_last_id = 0; - /* If a group was passed, we check if the request is about messages - * never delivered so far (normally this happens when ">" ID is passed). - * - * If instead the client is asking for some history, we serve it - * using a different function, so that we return entries *solely* - * from its own PEL. This ensures each consumer will always and only - * see the history of messages delivered to it and not yet confirmed + /* If the client is asking for some history, we serve it using a + * different function, so that we return entries *solely* from its + * own PEL. This ensures each consumer will always and only see + * the history of messages delivered to it and not yet confirmed * as delivered. */ - if (group && streamCompareID(start,&group->last_id) <= 0) { + if (group && (flags & STREAM_RWR_HISTORY)) { return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, consumer); } @@ -1474,6 +1472,7 @@ void xreadCommand(client *c) { stream *s = o->ptr; streamID *gt = ids+i; /* ID must be greater than this. */ int serve_synchronously = 0; + int serve_history = 0; /* True for XREADGROUP with ID != ">". */ /* Check if there are the conditions to serve the client * synchronously. */ @@ -1485,6 +1484,7 @@ void xreadCommand(client *c) { gt->seq != UINT64_MAX) { serve_synchronously = 1; + serve_history = 1; } else { /* We also want to serve a consumer in a consumer group * synchronously in case the group top item delivered is smaller @@ -1520,9 +1520,12 @@ void xreadCommand(client *c) { if (groups) consumer = streamLookupConsumer(groups[i], consumername->ptr,1); streamPropInfo spi = {c->argv[i+streams_arg],groupname}; + int flags = 0; + if (noack) flags |= STREAM_RWR_NOACK; + if (serve_history) flags |= STREAM_RWR_HISTORY; streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, - consumer, noack, &spi); + consumer, flags, &spi); if (groups) server.dirty++; } } From db8d7d3dc4912cce0b64432e9282be7b2b596794 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 19 Nov 2018 16:50:54 +0100 Subject: [PATCH 39/51] Test: regression test for #5577. --- tests/unit/type/stream-cgroups.tcl | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index d2e0d6539..e1358e51e 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -108,6 +108,30 @@ start_server { assert {$c == 5} } + test {XREADGROUP will not report data on empty history. Bug #5577} { + r del events + r xadd events * a 1 + r xadd events * b 2 + r xadd events * c 3 + r xgroup create events mygroup 0 + + # Current local PEL should be empty + set res [r xpending events mygroup - + 10] + assert {[llength $res] == 0} + + # So XREADGROUP should read an empty history as well + set res [r xreadgroup group mygroup myconsumer count 3 streams events 0] + assert {[llength [lindex $res 0 1]] == 0} + + # We should fetch all the elements in the stream asking for > + set res [r xreadgroup group mygroup myconsumer count 3 streams events >] + assert {[llength [lindex $res 0 1]] == 3} + + # Now the history is populated with three not acked entries + set res [r xreadgroup group mygroup myconsumer count 3 streams events 0] + assert {[llength [lindex $res 0 1]] == 3} + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host] From 2bd6802fa1bd48260022435e9a040b8233d6cdfc Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 19 Nov 2018 17:00:30 +0100 Subject: [PATCH 40/51] Stream: fix XREADGROUP history reading of deleted messages. This commit fixes #5570. It is a similar bug to one fixed a few weeks ago and is due to the range API to be called with NULL as "end ID" parameter instead of repeating again the start ID, to be sure that we selectively issue the entry with a given ID, or we get zero returned (and we know we should emit a NULL reply). --- src/t_stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index dd83f2b1d..f51f6c46b 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1025,7 +1025,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start if (end && memcmp(ri.key,end,ri.key_len) > 0) break; streamID thisid; streamDecodeID(ri.key,&thisid); - if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL, + if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL, STREAM_RWR_RAWENTRIES,NULL) == 0) { /* Note that we may have a not acknowledged entry in the PEL From 30a455f14a55f99a1df17035fedcceb1e8b60b9a Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 19 Nov 2018 17:19:23 +0100 Subject: [PATCH 41/51] Test: regression test for #5570. --- tests/unit/type/stream-cgroups.tcl | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index e1358e51e..4bb8f7ca0 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -132,6 +132,21 @@ start_server { assert {[llength [lindex $res 0 1]] == 3} } + test {XREADGROUP history reporting of deleted entries. Bug #5570} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 1 field1 A + r XREADGROUP GROUP mygroup myconsumer STREAMS mystream > + r XADD mystream MAXLEN 1 2 field1 B + r XREADGROUP GROUP mygroup myconsumer STREAMS mystream > + + # Now we have two pending entries, however one should be deleted + # and one should be ok (we should only see "B") + set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1] + assert {[lindex $res 0 1 0] == {1-0 {}}} + assert {[lindex $res 0 1 1] == {2-0 {field1 B}}} + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host] From 2f76829dc7ed1c76a2ac17bd6dc9cf7c3b5d7cb5 Mon Sep 17 00:00:00 2001 From: yongman Date: Wed, 21 Nov 2018 23:01:35 +0800 Subject: [PATCH 42/51] skip slave nodes when sending cluster setslot command --- src/redis-cli.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/redis-cli.c b/src/redis-cli.c index 4f68be200..28c842fdb 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3962,6 +3962,7 @@ static int clusterManagerFixOpenSlot(int slot) { while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n == owner) continue; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); success = clusterManagerCheckRedisReply(n, r, NULL); From 2961c891616a6d60e28aea748c3c2422cb4f90b6 Mon Sep 17 00:00:00 2001 From: yongman Date: Fri, 23 Nov 2018 16:58:55 +0800 Subject: [PATCH 43/51] Fix choose a random master node for slot assignment --- src/redis-cli.c | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 28c842fdb..a7a4d685f 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3505,6 +3505,34 @@ static clusterManagerNode *clusterManagerNodeWithLeastReplicas() { return node; } +/* This fucntion returns a random master node, return NULL if none */ + +static clusterManagerNode *clusterManagerNodeMasterRandom() { + int master_count = 0; + int idx; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + master_count++; + } + + srand(time(NULL)); + idx = rand() % master_count; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + if (!idx--) { + return n; + } + } + /* Can not be reached */ + return NULL; +} + static int clusterManagerFixSlotsCoverage(char *all_slots) { int i, fixed = 0; list *none = NULL, *single = NULL, *multi = NULL; @@ -3577,16 +3605,12 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { "across the cluster:\n"); clusterManagerPrintSlotsList(none); if (confirmWithYes("Fix these slots by covering with a random node?")){ - srand(time(NULL)); listIter li; listNode *ln; listRewind(none, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; - long idx = (long) (rand() % listLength(cluster_manager.nodes)); - listNode *node_n = listIndex(cluster_manager.nodes, idx); - assert(node_n != NULL); - clusterManagerNode *n = node_n->value; + clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); /* Ensure the slot is not already assigned. */ From 221dfbd322227ac35b43f0a10855b02f9095ed72 Mon Sep 17 00:00:00 2001 From: yongman Date: Fri, 23 Nov 2018 23:51:16 +0800 Subject: [PATCH 44/51] fix typo --- src/redis-cli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index a7a4d685f..f307d31cf 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3505,7 +3505,7 @@ static clusterManagerNode *clusterManagerNodeWithLeastReplicas() { return node; } -/* This fucntion returns a random master node, return NULL if none */ +/* This function returns a random master node, return NULL if none */ static clusterManagerNode *clusterManagerNodeMasterRandom() { int master_count = 0; From 9cfcf37968c9890e6f183461f382528cfbe4d375 Mon Sep 17 00:00:00 2001 From: Chris Lamb Date: Fri, 23 Nov 2018 17:57:01 +0100 Subject: [PATCH 45/51] Clarify the "Creating Server TCP listening socket" error. This really helps spot it in the logs, otherwise it does not look like a warning/error. For example: Creating Server TCP listening socket ::1:6379: bind: Cannot assign requested address ... is not nearly as clear as: Could not create server TCP listening listening socket ::1:6379: bind: Cannot assign requested address --- src/server.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.c b/src/server.c index d03873060..bb0063ce6 100644 --- a/src/server.c +++ b/src/server.c @@ -1958,7 +1958,7 @@ int listenToPort(int port, int *fds, int *count) { } if (fds[*count] == ANET_ERR) { serverLog(LL_WARNING, - "Creating Server TCP listening socket %s:%d: %s", + "Could not create server TCP listening socket %s:%d: %s", server.bindaddr[j] ? server.bindaddr[j] : "*", port, server.neterr); return C_ERR; From fbff351406ec924f48898badc7a25576a2c1b7b3 Mon Sep 17 00:00:00 2001 From: Chris Lamb Date: Fri, 23 Nov 2018 17:43:01 +0100 Subject: [PATCH 46/51] Don't treat unsupported protocols as fatal errors If we encounter an unsupported protocol in the "bind" list, don't ipso-facto consider it a fatal error. We continue to abort startup if there are no listening sockets at all. This ensures that the lack of IPv6 support does not prevent Redis from starting on Debian where we try to bind to the ::1 interface by default (via "bind 127.0.0.1 ::1"). A machine with IPv6 disabled (such as some container systems) would simply fail to start Redis after the initiall call to apt(8). This is similar to the case for where "bind" is not specified: https://github.com/antirez/redis/issues/3894 ... and was based on the corresponding PR: https://github.com/antirez/redis/pull/4108 ... but also adds EADDRNOTAVAIL to the list of errors to catch which I believe is missing from there. This issue was raised in Debian as both & . --- src/server.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/server.c b/src/server.c index d03873060..dbbc35db4 100644 --- a/src/server.c +++ b/src/server.c @@ -1961,6 +1961,10 @@ int listenToPort(int port, int *fds, int *count) { "Creating Server TCP listening socket %s:%d: %s", server.bindaddr[j] ? server.bindaddr[j] : "*", port, server.neterr); + if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || + errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || + errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL) + continue; return C_ERR; } anetNonBlock(NULL,fds[*count]); From 69ca9078684f66366c461092bdb46172c27b845d Mon Sep 17 00:00:00 2001 From: David Carlier Date: Sat, 24 Nov 2018 15:49:45 +0000 Subject: [PATCH 47/51] Backtrace/register dump on BSD. FreeBSD/DragonFlyBSD does have backtrace only it does not belong to libc. --- src/Makefile | 4 +-- src/config.h | 3 +- src/debug.c | 93 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 3 deletions(-) diff --git a/src/Makefile b/src/Makefile index 0de0a1c61..188dac3b9 100644 --- a/src/Makefile +++ b/src/Makefile @@ -105,11 +105,11 @@ ifeq ($(uname_S),OpenBSD) else ifeq ($(uname_S),FreeBSD) # FreeBSD - FINAL_LIBS+= -lpthread + FINAL_LIBS+= -lpthread -lexecinfo else ifeq ($(uname_S),DragonFly) # FreeBSD - FINAL_LIBS+= -lpthread + FINAL_LIBS+= -lpthread -lexecinfo else # All the other OSes (notably Linux) FINAL_LDFLAGS+= -rdynamic diff --git a/src/config.h b/src/config.h index ee3ad508e..0a74a7609 100644 --- a/src/config.h +++ b/src/config.h @@ -62,7 +62,8 @@ #endif /* Test for backtrace() */ -#if defined(__APPLE__) || (defined(__linux__) && defined(__GLIBC__)) +#if defined(__APPLE__) || (defined(__linux__) && defined(__GLIBC__)) || \ + defined(__FreeBSD__) || defined(__DragonFly__) #define HAVE_BACKTRACE 1 #endif diff --git a/src/debug.c b/src/debug.c index 70def3b30..ff0df7568 100644 --- a/src/debug.c +++ b/src/debug.c @@ -729,6 +729,15 @@ static void *getMcontextEip(ucontext_t *uc) { #elif defined(__aarch64__) /* Linux AArch64 */ return (void*) uc->uc_mcontext.pc; #endif +#elif defined(__FreeBSD__) + /* FreeBSD */ + #if defined(__i386__) + return (void*) uc->uc_mcontext.mc_eip; + #elif defined(__x86_64__) + return (void*) uc->uc_mcontext.mc_rip; + #endif +#elif defined(__DragonFly__) + return (void*) uc->uc_mcontext.mc_rip; #else return NULL; #endif @@ -870,6 +879,90 @@ void logRegisters(ucontext_t *uc) { ); logStackContent((void**)uc->uc_mcontext.gregs[15]); #endif +#elif defined(__FreeBSD__) + #if defined(__x86_64__) + serverLog(LL_WARNING, + "\n" + "RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n" + "RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n" + "R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n" + "R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n" + "RIP:%016lx EFL:%016lx\nCSGSFS:%016lx", + (unsigned long) uc->uc_mcontext.mc_rax, + (unsigned long) uc->uc_mcontext.mc_rbx, + (unsigned long) uc->uc_mcontext.mc_rcx, + (unsigned long) uc->uc_mcontext.mc_rdx, + (unsigned long) uc->uc_mcontext.mc_rdi, + (unsigned long) uc->uc_mcontext.mc_rsi, + (unsigned long) uc->uc_mcontext.mc_rbp, + (unsigned long) uc->uc_mcontext.mc_rsp, + (unsigned long) uc->uc_mcontext.mc_r8, + (unsigned long) uc->uc_mcontext.mc_r9, + (unsigned long) uc->uc_mcontext.mc_r10, + (unsigned long) uc->uc_mcontext.mc_r11, + (unsigned long) uc->uc_mcontext.mc_r12, + (unsigned long) uc->uc_mcontext.mc_r13, + (unsigned long) uc->uc_mcontext.mc_r14, + (unsigned long) uc->uc_mcontext.mc_r15, + (unsigned long) uc->uc_mcontext.mc_rip, + (unsigned long) uc->uc_mcontext.mc_rflags, + (unsigned long) uc->uc_mcontext.mc_cs + ); + logStackContent((void**)uc->uc_mcontext.mc_rsp); + #elif defined(__i386__) + serverLog(LL_WARNING, + "\n" + "EAX:%08lx EBX:%08lx ECX:%08lx EDX:%08lx\n" + "EDI:%08lx ESI:%08lx EBP:%08lx ESP:%08lx\n" + "SS :%08lx EFL:%08lx EIP:%08lx CS:%08lx\n" + "DS :%08lx ES :%08lx FS :%08lx GS:%08lx", + (unsigned long) uc->uc_mcontext.mc_eax, + (unsigned long) uc->uc_mcontext.mc_ebx, + (unsigned long) uc->uc_mcontext.mc_ebx, + (unsigned long) uc->uc_mcontext.mc_edx, + (unsigned long) uc->uc_mcontext.mc_edi, + (unsigned long) uc->uc_mcontext.mc_esi, + (unsigned long) uc->uc_mcontext.mc_ebp, + (unsigned long) uc->uc_mcontext.mc_esp, + (unsigned long) uc->uc_mcontext.mc_ss, + (unsigned long) uc->uc_mcontext.mc_eflags, + (unsigned long) uc->uc_mcontext.mc_eip, + (unsigned long) uc->uc_mcontext.mc_cs, + (unsigned long) uc->uc_mcontext.mc_es, + (unsigned long) uc->uc_mcontext.mc_fs, + (unsigned long) uc->uc_mcontext.mc_gs + ); + logStackContent((void**)uc->uc_mcontext.mc_esp); + #endif +#elif defined(__DragonFly__) + serverLog(LL_WARNING, + "\n" + "RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n" + "RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n" + "R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n" + "R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n" + "RIP:%016lx EFL:%016lx\nCSGSFS:%016lx", + (unsigned long) uc->uc_mcontext.mc_rax, + (unsigned long) uc->uc_mcontext.mc_rbx, + (unsigned long) uc->uc_mcontext.mc_rcx, + (unsigned long) uc->uc_mcontext.mc_rdx, + (unsigned long) uc->uc_mcontext.mc_rdi, + (unsigned long) uc->uc_mcontext.mc_rsi, + (unsigned long) uc->uc_mcontext.mc_rbp, + (unsigned long) uc->uc_mcontext.mc_rsp, + (unsigned long) uc->uc_mcontext.mc_r8, + (unsigned long) uc->uc_mcontext.mc_r9, + (unsigned long) uc->uc_mcontext.mc_r10, + (unsigned long) uc->uc_mcontext.mc_r11, + (unsigned long) uc->uc_mcontext.mc_r12, + (unsigned long) uc->uc_mcontext.mc_r13, + (unsigned long) uc->uc_mcontext.mc_r14, + (unsigned long) uc->uc_mcontext.mc_r15, + (unsigned long) uc->uc_mcontext.mc_rip, + (unsigned long) uc->uc_mcontext.mc_rflags, + (unsigned long) uc->uc_mcontext.mc_cs + ); + logStackContent((void**)uc->uc_mcontext.mc_rsp); #else serverLog(LL_WARNING, " Dumping of registers not supported for this OS/arch"); From ac086b1932eae5bc47c30c502c56e87b60525e10 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Sun, 25 Nov 2018 08:10:26 +0000 Subject: [PATCH 48/51] OpenBSD support. Special treatment here as backtrace support is optional, cannot be found via pkg-config and similar neither. --- src/Makefile | 6 +++++ src/config.h | 3 ++- src/debug.c | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index 188dac3b9..9edbb4581 100644 --- a/src/Makefile +++ b/src/Makefile @@ -102,6 +102,12 @@ else ifeq ($(uname_S),OpenBSD) # OpenBSD FINAL_LIBS+= -lpthread + ifeq ($(USE_BACKTRACE),yes) + FINAL_CFLAGS+= -DUSE_BACKTRACE -I/usr/local/include + FINAL_LDFLAGS+= -L/usr/local/lib + FINAL_LIBS+= -lexecinfo + endif + else ifeq ($(uname_S),FreeBSD) # FreeBSD diff --git a/src/config.h b/src/config.h index 0a74a7609..efa9d11f2 100644 --- a/src/config.h +++ b/src/config.h @@ -63,7 +63,8 @@ /* Test for backtrace() */ #if defined(__APPLE__) || (defined(__linux__) && defined(__GLIBC__)) || \ - defined(__FreeBSD__) || defined(__DragonFly__) + defined(__FreeBSD__) || (defined(__OpenBSD__) && defined(USE_BACKTRACE))\ + || defined(__DragonFly__) #define HAVE_BACKTRACE 1 #endif diff --git a/src/debug.c b/src/debug.c index ff0df7568..3cb567520 100644 --- a/src/debug.c +++ b/src/debug.c @@ -37,7 +37,11 @@ #ifdef HAVE_BACKTRACE #include +#ifndef __OpenBSD__ #include +#else +typedef ucontext_t sigcontext_t; +#endif #include #include "bio.h" #include @@ -736,6 +740,13 @@ static void *getMcontextEip(ucontext_t *uc) { #elif defined(__x86_64__) return (void*) uc->uc_mcontext.mc_rip; #endif +#elif defined(__OpenBSD__) + /* OpenBSD */ + #if defined(__i386__) + return (void*) uc->sc_eip; + #elif defined(__x86_64__) + return (void*) uc->sc_rip; + #endif #elif defined(__DragonFly__) return (void*) uc->uc_mcontext.mc_rip; #else @@ -934,6 +945,61 @@ void logRegisters(ucontext_t *uc) { ); logStackContent((void**)uc->uc_mcontext.mc_esp); #endif +#elif defined(__OpenBSD__) + #if defined(__x86_64__) + serverLog(LL_WARNING, + "\n" + "RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n" + "RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n" + "R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n" + "R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n" + "RIP:%016lx EFL:%016lx\nCSGSFS:%016lx", + (unsigned long) uc->sc_rax, + (unsigned long) uc->sc_rbx, + (unsigned long) uc->sc_rcx, + (unsigned long) uc->sc_rdx, + (unsigned long) uc->sc_rdi, + (unsigned long) uc->sc_rsi, + (unsigned long) uc->sc_rbp, + (unsigned long) uc->sc_rsp, + (unsigned long) uc->sc_r8, + (unsigned long) uc->sc_r9, + (unsigned long) uc->sc_r10, + (unsigned long) uc->sc_r11, + (unsigned long) uc->sc_r12, + (unsigned long) uc->sc_r13, + (unsigned long) uc->sc_r14, + (unsigned long) uc->sc_r15, + (unsigned long) uc->sc_rip, + (unsigned long) uc->sc_rflags, + (unsigned long) uc->sc_cs + ); + logStackContent((void**)uc->sc_rsp); + #elif defined(__i386__) + serverLog(LL_WARNING, + "\n" + "EAX:%08lx EBX:%08lx ECX:%08lx EDX:%08lx\n" + "EDI:%08lx ESI:%08lx EBP:%08lx ESP:%08lx\n" + "SS :%08lx EFL:%08lx EIP:%08lx CS:%08lx\n" + "DS :%08lx ES :%08lx FS :%08lx GS:%08lx", + (unsigned long) uc->sc_eax, + (unsigned long) uc->sc_ebx, + (unsigned long) uc->sc_ebx, + (unsigned long) uc->sc_edx, + (unsigned long) uc->sc_edi, + (unsigned long) uc->sc_esi, + (unsigned long) uc->sc_ebp, + (unsigned long) uc->sc_esp, + (unsigned long) uc->sc_ss, + (unsigned long) uc->sc_eflags, + (unsigned long) uc->sc_eip, + (unsigned long) uc->sc_cs, + (unsigned long) uc->sc_es, + (unsigned long) uc->sc_fs, + (unsigned long) uc->sc_gs + ); + logStackContent((void**)uc->sc_esp); + #endif #elif defined(__DragonFly__) serverLog(LL_WARNING, "\n" From edd3939bef97ada7943d9093308fdb68800c0cc3 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 28 Nov 2018 16:24:50 +0100 Subject: [PATCH 49/51] Abort instead of crashing when loading bad stream master key. See #5612. --- src/rdb.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/rdb.c b/src/rdb.c index 3e43cb4e4..47555101e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1645,6 +1645,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { * node: the entries inside the listpack itself are delta-encoded * relatively to this ID. */ sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); + if (nodekey == NULL) { + rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error."); + } if (sdslen(nodekey) != sizeof(streamID)) { rdbExitReportCorruptRDB("Stream node key entry is not the " "size of a stream ID"); From c99f1206b7f30c642868be1ff902d56ceb0321ef Mon Sep 17 00:00:00 2001 From: Qu Chen Date: Mon, 12 Nov 2018 15:28:39 -0800 Subject: [PATCH 50/51] Add unit test for stream XCLAIM command. --- tests/unit/type/stream-cgroups.tcl | 48 ++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 4bb8f7ca0..9dfb3f0eb 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -147,6 +147,54 @@ start_server { assert {[lindex $res 0 1 1] == {2-0 {field1 B}}} } + test {XCLAIM can claim PEL items from another consumer} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Client 1 reads item 1 from the stream without acknowledgements. + # Client 2 then claims pending item 1 from the PEL of client 1 + set reply [ + r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + ] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + r debug sleep 0.2 + set reply [ + r XCLAIM mystream mygroup client2 10 $id1 + ] + assert {[llength [lindex $reply 0 1]] == 2} + assert {[lindex $reply 0 1] eq {a 1}} + + # Client 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream > + r debug sleep 0.2 + + # Delete item 2 from the stream. Now client 1 has PEL that contains + # only item 3. Try to use client 2 to claim the deleted item 2 + # from the PEL of client 1, this should return nil + r XDEL mystream $id2 + set reply [ + r XCLAIM mystream mygroup client2 10 $id2 + ] + assert {[llength $reply] == 1} + assert_equal "" [lindex $reply 0] + + # Delete item 3 from the stream. Now client 1 has PEL that is empty. + # Try to use client 2 to claim the deleted item 3 from the PEL + # of client 1, this should return nil + r debug sleep 0.2 + r XDEL mystream $id3 + set reply [ + r XCLAIM mystream mygroup client2 10 $id3 + ] + assert {[llength $reply] == 1} + assert_equal "" [lindex $reply 0] + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host] From d56c63134340ce155a49c03364819b9266a67a52 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Thu, 29 Nov 2018 01:01:47 +0800 Subject: [PATCH 51/51] MEMORY command: make USAGE more accurate In MEMORY USAGE command, we count the key argv[2] into usage, but the argument in command may contains free spaces because of sdsMakeRoomFor. But the key in db never contains free spaces because we use sdsdup when dbAdd, so using the real key to count the usage is more accurate. --- src/object.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/object.c b/src/object.c index 87b32f428..48ffa42b9 100644 --- a/src/object.c +++ b/src/object.c @@ -1285,8 +1285,6 @@ NULL * * Usage: MEMORY usage */ void memoryCommand(client *c) { - robj *o; - if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) { const char *help[] = { "DOCTOR - Return memory problems reports.", @@ -1298,6 +1296,7 @@ NULL }; addReplyHelp(c, help); } else if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) { + dictEntry *de; long long samples = OBJ_COMPUTE_SIZE_DEF_SAMPLES; for (int j = 3; j < c->argc; j++) { if (!strcasecmp(c->argv[j]->ptr,"samples") && @@ -1316,10 +1315,12 @@ NULL return; } } - if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk)) - == NULL) return; - size_t usage = objectComputeSize(o,samples); - usage += sdsAllocSize(c->argv[2]->ptr); + if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) { + addReply(c, shared.nullbulk); + return; + } + size_t usage = objectComputeSize(dictGetVal(de),samples); + usage += sdsAllocSize(dictGetKey(de)); usage += sizeof(dictEntry); addReplyLongLong(c,usage); } else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {