diff --git a/modules/Makefile b/modules/Makefile index 9ee4c8a8d7..cba9782699 100644 --- a/modules/Makefile +++ b/modules/Makefile @@ -32,7 +32,7 @@ clean_environment: uninstall-rust # Keep all of the Rust stuff in one place install-rust: ifeq ($(INSTALL_RUST_TOOLCHAIN),yes) - @RUST_VERSION=1.93.1; \ + @RUST_VERSION=1.94.0; \ ARCH="$$(uname -m)"; \ if ldd --version 2>&1 | grep -q musl; then LIBC_TYPE="musl"; else LIBC_TYPE="gnu"; fi; \ echo "Detected architecture: $${ARCH} and libc: $${LIBC_TYPE}"; \ @@ -40,18 +40,18 @@ ifeq ($(INSTALL_RUST_TOOLCHAIN),yes) 'x86_64') \ if [ "$${LIBC_TYPE}" = "musl" ]; then \ RUST_INSTALLER="rust-$${RUST_VERSION}-x86_64-unknown-linux-musl"; \ - RUST_SHA256="6a57ddfffa77bfa97ab325586b08fc1e96c3acb82ecc9f554cdb2ef748466ef2"; \ + RUST_SHA256="9a358120ce1491a4d5b7f71a41e4e97b380b5db5d4ec31f7110f5b3090bd3d55"; \ else \ RUST_INSTALLER="rust-$${RUST_VERSION}-x86_64-unknown-linux-gnu"; \ - RUST_SHA256="518872275a3648f735471d7add854d39171c5a6b17f423c4d7f931f52120c2af"; \ + RUST_SHA256="e8fa4185f3ef6ae32725ff638b1ecdbff28f5d651dc0b3111e2539350d03b15a"; \ fi ;; \ 'aarch64') \ if [ "$${LIBC_TYPE}" = "musl" ]; then \ RUST_INSTALLER="rust-$${RUST_VERSION}-aarch64-unknown-linux-musl"; \ - RUST_SHA256="34f0ffb0cd3e334aeae344daae09984f951eeb842386406d4bdf11cd0b4c2b36"; \ + RUST_SHA256="008b3f0fc4175c956ecbfa4e0c48865ec3f953741b2926e75e8ded7e3adfdb19"; \ else \ RUST_INSTALLER="rust-$${RUST_VERSION}-aarch64-unknown-linux-gnu"; \ - RUST_SHA256="701d55b62286bed013ceb2393ff7687d0953205605afaa15c62e2cd18024c32c"; \ + RUST_SHA256="c6fd6d1c925ed986df3b2c0b89bbc90ce15afb62e4d522a054e7d50c856b3c1a"; \ fi ;; \ *) echo >&2 "Unsupported architecture: '$${ARCH}'"; exit 1 ;; \ esac; \ diff --git a/modules/vector-sets/tests/vsim_duplicate_filter.py b/modules/vector-sets/tests/vsim_duplicate_filter.py new file mode 100644 index 0000000000..522a631254 --- /dev/null +++ b/modules/vector-sets/tests/vsim_duplicate_filter.py @@ -0,0 +1,19 @@ +from test import TestCase + +class VSIMDuplicateFilterLeak(TestCase): + def getname(self): + return "[regression] VSIM duplicate FILTER should not leak memory" + + def test(self): + self.redis.execute_command('VADD', self.test_key, 'VALUES', 3, 0.5774, 0.5774, 0.5774, 'elem1') + self.redis.execute_command('VADD', self.test_key, 'VALUES', 3, 0.7071, 0.7071, 0.0, 'elem2') + self.redis.execute_command('VSETATTR', self.test_key, 'elem1', '{"a": 1, "b": 2}') + self.redis.execute_command('VSETATTR', self.test_key, 'elem2', '{"a": 2, "b": 3}') + + # Duplicate FILTER: before the fix the first exprstate was + # overwritten without exprFree(), leaking ~760 bytes per call. + # Under ASAN/valgrind this shows up as a leak at server exit. + for _ in range(100): + self.redis.execute_command( + 'VSIM', self.test_key, 'VALUES', 3, 0.5774, 0.5774, 0.5774, + 'FILTER', '.a == 1', 'FILTER', '.b >= 1') diff --git a/modules/vector-sets/tests/vsim_filter_error_leak.py b/modules/vector-sets/tests/vsim_filter_error_leak.py new file mode 100644 index 0000000000..aaba68a282 --- /dev/null +++ b/modules/vector-sets/tests/vsim_filter_error_leak.py @@ -0,0 +1,32 @@ +from test import TestCase +import redis as redis_module + +class VSIMFilterLeakOnOptionError(TestCase): + def getname(self): + return "[regression] VSIM FILTER expr freed on option parse error" + + def test(self): + self.redis.execute_command('VADD', self.test_key, 'VALUES', 3, 1, 0, 0, 'elem1') + + # Valid FILTER followed by invalid option values. Before the fix, + # error paths freed vec but not filter_expr, leaking the compiled + # exprstate. Under ASAN/valgrind this shows up at server exit. + error_cmds = [ + # invalid COUNT (0) + ['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'COUNT', 0], + # invalid EF (0) + ['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'EF', 0], + # invalid EPSILON (0) + ['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'EPSILON', 0], + # invalid FILTER-EF (0) + ['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'FILTER-EF', 0], + # unknown option + ['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'BADOPT', 1], + ] + + for cmd in error_cmds: + for _ in range(20): + try: + self.redis.execute_command(*cmd) + except redis_module.exceptions.ResponseError: + pass diff --git a/modules/vector-sets/vset.c b/modules/vector-sets/vset.c index 500f8e99e1..618723e91f 100644 --- a/modules/vector-sets/vset.c +++ b/modules/vector-sets/vset.c @@ -1064,6 +1064,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { != REDISMODULE_OK || count <= 0) { RedisModule_Free(vec); + if (filter_expr) exprFree(filter_expr); return RedisModule_ReplyWithError(ctx, "ERR invalid COUNT"); } j += 2; @@ -1072,6 +1073,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_OK || epsilon <= 0) { RedisModule_Free(vec); + if (filter_expr) exprFree(filter_expr); return RedisModule_ReplyWithError(ctx, "ERR invalid EPSILON"); } j += 2; @@ -1080,6 +1082,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_OK || ef <= 0 || ef > 1000000) { RedisModule_Free(vec); + if (filter_expr) exprFree(filter_expr); return RedisModule_ReplyWithError(ctx, "ERR invalid EF"); } j += 2; @@ -1088,6 +1091,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_OK || filter_ef <= 0) { RedisModule_Free(vec); + if (filter_expr) exprFree(filter_expr); return RedisModule_ReplyWithError(ctx, "ERR invalid FILTER-EF"); } j += 2; @@ -1096,6 +1100,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { size_t exprlen; char *exprstr = (char*)RedisModule_StringPtrLen(exprarg,&exprlen); int errpos; + if (filter_expr) exprFree(filter_expr); filter_expr = exprCompile(exprstr,&errpos); if (filter_expr == NULL) { if ((size_t)errpos >= exprlen) errpos = 0; @@ -1107,6 +1112,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { j += 2; } else { RedisModule_Free(vec); + if (filter_expr) exprFree(filter_expr); return RedisModule_ReplyWithError(ctx, "ERR syntax error in VSIM command"); } diff --git a/runtest-moduleapi b/runtest-moduleapi index e6581afedb..368d6eca56 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -60,4 +60,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/internalsecret \ --single unit/moduleapi/configaccess \ --single unit/moduleapi/keymeta \ +--single unit/moduleapi/ksn_notify_side_effect \ "${@}" diff --git a/src/cluster.c b/src/cluster.c index e9b1a8b5eb..04ba146471 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -306,13 +306,8 @@ void restoreCommand(client *c) { estoreAdd(c->db->subexpires, getKeySlot(key->ptr), kv, minExpiredField); } - if (kvtype == OBJ_STREAM) { - stream *s = kv->ptr; - if (s->idmp_producers != NULL) { - if (dictAdd(c->db->stream_idmp_keys, key, NULL) == DICT_OK) - incrRefCount(key); - } - } + if (kvtype == OBJ_STREAM) + streamKeyLoaded(c->db, key, kv); if (ttl) { if (!absttl) { diff --git a/src/commands.def b/src/commands.def index 179c16b9a2..3980365d46 100644 --- a/src/commands.def +++ b/src/commands.def @@ -11062,9 +11062,9 @@ keySpec GCRA_Keyspecs[1] = { struct COMMAND_ARG GCRA_Args[] = { {MAKE_ARG("key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, {MAKE_ARG("max-burst",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, -{MAKE_ARG("requests-per-period",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("tokens-per-period",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, {MAKE_ARG("period",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, -{MAKE_ARG("count",ARG_TYPE_INTEGER,-1,"NUM_REQUESTS",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)}, +{MAKE_ARG("count",ARG_TYPE_INTEGER,-1,"TOKENS",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)}, }; /********** GET ********************/ @@ -11839,7 +11839,7 @@ struct COMMAND_STRUCT redisCommandTable[] = { {MAKE_CMD("pfadd","Adds elements to a HyperLogLog key. Creates the key if it doesn't exist.","O(1) to add every element.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFADD_History,0,PFADD_Tips,0,pfaddCommand,-2,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_HYPERLOGLOG,PFADD_Keyspecs,1,NULL,2),.args=PFADD_Args}, {MAKE_CMD("pfcount","Returns the approximated cardinality of the set(s) observed by the HyperLogLog key(s).","O(1) with a very small average constant time when called with a single key. O(N) with N being the number of keys, and much bigger constant times, when called with multiple keys.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFCOUNT_History,0,PFCOUNT_Tips,0,pfcountCommand,-2,CMD_READONLY|CMD_MAY_REPLICATE,ACL_CATEGORY_HYPERLOGLOG,PFCOUNT_Keyspecs,1,NULL,1),.args=PFCOUNT_Args}, {MAKE_CMD("pfdebug","Internal commands for debugging HyperLogLog values.","N/A","2.8.9",CMD_DOC_SYSCMD,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFDEBUG_History,0,PFDEBUG_Tips,0,pfdebugCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_ADMIN,ACL_CATEGORY_HYPERLOGLOG,PFDEBUG_Keyspecs,1,NULL,2),.args=PFDEBUG_Args}, -{MAKE_CMD("pfmerge","Merges one or more HyperLogLog values into a single key.","O(N) to merge N HyperLogLogs, but with high constant times.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFMERGE_History,0,PFMERGE_Tips,0,pfmergeCommand,-2,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_HYPERLOGLOG,PFMERGE_Keyspecs,2,NULL,2),.args=PFMERGE_Args}, +{MAKE_CMD("pfmerge","Merges one or more HyperLogLog values into a single key.","O(N) to merge N HyperLogLogs, but with high constant times.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFMERGE_History,0,PFMERGE_Tips,0,pfmergeCommand,-2,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_HYPERLOGLOG,PFMERGE_Keyspecs,2,pfmergeGetKeys,2),.args=PFMERGE_Args}, {MAKE_CMD("pfselftest","An internal command for testing HyperLogLog values.","N/A","2.8.9",CMD_DOC_SYSCMD,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFSELFTEST_History,0,PFSELFTEST_Tips,0,pfselftestCommand,1,CMD_ADMIN,ACL_CATEGORY_HYPERLOGLOG,PFSELFTEST_Keyspecs,0,NULL,0)}, /* list */ {MAKE_CMD("blmove","Pops an element from a list, pushes it to another list and returns it. Blocks until an element is available otherwise. Deletes the list if the last element was moved.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"list",COMMAND_GROUP_LIST,BLMOVE_History,0,BLMOVE_Tips,0,blmoveCommand,6,CMD_WRITE|CMD_DENYOOM|CMD_BLOCKING,ACL_CATEGORY_LIST,BLMOVE_Keyspecs,2,NULL,5),.args=BLMOVE_Args}, diff --git a/src/commands/gcra.json b/src/commands/gcra.json index d6091fcfb4..cc0e029c22 100644 --- a/src/commands/gcra.json +++ b/src/commands/gcra.json @@ -47,15 +47,15 @@ }, { "type": "integer", - "description": "Max request number: always equal to max_burst+1" + "description": "Max request tokens: always equal to max_burst+1" }, { "type": "integer", - "description": "Number of requests available immediately" + "description": "Number of tokens available immediately" }, { "type": "integer", - "description": "Retry after: seconds after which caller should retry. Always -1 if not limited" + "description": "Retry after: seconds after which the caller should retry. Always -1 if not limited" }, { "type": "integer", @@ -74,7 +74,7 @@ "type": "integer" }, { - "name": "requests-per-period", + "name": "tokens-per-period", "type": "integer" }, { @@ -84,7 +84,7 @@ { "name": "count", "type": "integer", - "token": "NUM_REQUESTS", + "token": "TOKENS", "optional": true } ] diff --git a/src/commands/pfmerge.json b/src/commands/pfmerge.json index c93070f116..e3e26fabd8 100644 --- a/src/commands/pfmerge.json +++ b/src/commands/pfmerge.json @@ -6,6 +6,7 @@ "since": "2.8.9", "arity": -2, "function": "pfmergeCommand", + "get_keys_function": "pfmergeGetKeys", "command_flags": [ "WRITE", "DENYOOM" diff --git a/src/db.c b/src/db.c index 36d6d96e48..1565b7bef0 100644 --- a/src/db.c +++ b/src/db.c @@ -593,7 +593,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link estoreRemove(db->subexpires, slot, old); if (old->type == OBJ_STREAM) - dictDelete(db->stream_idmp_keys, key); + streamKeyRemoved(db, key, old); long long oldExpire = getExpire(db, key->ptr, old); @@ -858,7 +858,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { /* If stream with IDMP tracking, remove it from stream_idmp_keys */ if (type == OBJ_STREAM) - dictDelete(db->stream_idmp_keys, key); + streamKeyRemoved(db, key, kv); /* RM_StringDMA may call dbUnshareStringValue which may free kv, so we * need to incr to retain kv */ @@ -2243,13 +2243,11 @@ void renameGenericCommand(client *c, int nx) { estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime); /* Re-register stream IDMP tracking under the new key name. */ - if (srctype == OBJ_STREAM && ((stream *)o->ptr)->idmp_producers != NULL) { - if (dictAdd(c->db->stream_idmp_keys, c->argv[2], NULL) == DICT_OK) - incrRefCount(c->argv[2]); - } + if (srctype == OBJ_STREAM) + streamKeyLoaded(c->db, c->argv[2], o); keyModified(c,c->db,c->argv[1],NULL,1); - keyModified(c,c->db,c->argv[2],NULL,1); /* LRM already updated by dbAddInternal */ + keyModified(c,c->db,c->argv[2],o,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to", @@ -2343,13 +2341,11 @@ void moveCommand(client *c) { estoreAdd(dst->subexpires, slot, kv, hashExpireTime); /* Register stream IDMP tracking in the destination DB. */ - if (kv->type == OBJ_STREAM && ((stream *)kv->ptr)->idmp_producers != NULL) { - if (dictAdd(dst->stream_idmp_keys, c->argv[1], NULL) == DICT_OK) - incrRefCount(c->argv[1]); - } + if (kv->type == OBJ_STREAM) + streamKeyLoaded(dst, c->argv[1], kv); keyModified(c,src,c->argv[1],NULL,1); - keyModified(c,dst,c->argv[1],NULL,1); /* LRM already updated by dbAddInternal */ + keyModified(c,dst,c->argv[1],kv,1); notifyKeyspaceEvent(NOTIFY_GENERIC, "move_from",c->argv[1],src->id); notifyKeyspaceEvent(NOTIFY_GENERIC, @@ -2469,16 +2465,11 @@ void copyCommand(client *c) { estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire); /* Register copied stream with IDMP producers for cron-based expiration. */ - if (kvCopy->type == OBJ_STREAM) { - stream *s = kvCopy->ptr; - if (s->idmp_producers != NULL) { - if (dictAdd(dst->stream_idmp_keys, newkey, NULL) == DICT_OK) - incrRefCount(newkey); - } - } + if (kvCopy->type == OBJ_STREAM) + streamKeyLoaded(dst, newkey, kvCopy); - /* OK! key copied. Signal modification (LRM already updated by dbAddInternal) */ - keyModified(c,dst,c->argv[2],NULL,1); + /* OK! key copied. Signal modification */ + keyModified(c,dst,c->argv[2],kvCopy,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id); /* `delete` implies the destination key was overwritten */ @@ -3683,6 +3674,29 @@ int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult * return result->numkeys; } +int pfmergeGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + int i, numkeys; + keyReference *keys; + UNUSED(cmd); + UNUSED(argv); + + numkeys = argc - 1; /* destkey + all sourcekeys */ + keys = getKeysPrepareResult(result, numkeys); + + /* destkey at argv[1] */ + keys[0].pos = 1; + keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_INSERT; + + /* sourcekeys at argv[2..argc-1], may be zero */ + for (i = 2; i < argc; i++) { + keys[i - 1].pos = i; + keys[i - 1].flags = CMD_KEY_RO | CMD_KEY_ACCESS; + } + + result->numkeys = numkeys; + return result->numkeys; +} + /* This command declares incomplete keys, so the flags are correctly set for this function */ int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { int i, j, num, first; diff --git a/src/debug.c b/src/debug.c index 370b572b36..14f8cfe73b 100644 --- a/src/debug.c +++ b/src/debug.c @@ -793,7 +793,7 @@ NULL memcpy(val->ptr, buf, valsize<=buflen? valsize: buflen); } dbAdd(c->db, key, &val); - keyModified(c,c->db,key,NULL,1); + keyModified(c,c->db,key,val,1); decrRefCount(key); } addReply(c,shared.ok); diff --git a/src/gcra.c b/src/gcra.c index f264b2754e..9a2c23df5e 100644 --- a/src/gcra.c +++ b/src/gcra.c @@ -65,21 +65,21 @@ * * (ASCII art adapted from https://brandur.org/rate-limiting). */ -/* GCRA key max_burst requests_per_period period [NUM_REQUESTS count] +/* GCRA key max_burst tokens_per_period period [TOKENS count] * * key: Key related to specific rate limiting case - * max_burst: Maximum requests allowed as burst (in addition to sustained rate) - * requests_per_period: Number of requests allowed per period + * max_burst: Maximum tokens allowed as burst (in addition to sustained rate) + * tokens_per_period: Number of tokens allowed per period * period: Period in seconds for calculating sustained rate - * num_requests: Optional, cost of this request (default: 1) + * tokens: Optional, cost of this request (default: 1) */ void gcraCommand(client *c) { robj *key = c->argv[1]; /* GCRA parameters */ long max_burst; - long requests_per_period; - long num_requests = 1; + long tokens_per_period; + long num_tokens = 1; double period; /* Variables used in the reply */ @@ -98,7 +98,7 @@ void gcraCommand(client *c) { } if (likely(max_burst < LONG_MAX)) max_burst += 1; - if (getRangeLongFromObjectOrReply(c, c->argv[3], 1, LONG_MAX, &requests_per_period, NULL) != C_OK) { + if (getRangeLongFromObjectOrReply(c, c->argv[3], 1, LONG_MAX, &tokens_per_period, NULL) != C_OK) { return; } @@ -111,15 +111,15 @@ void gcraCommand(client *c) { } if (c->argc >= 6) { - if (strcasecmp("NUM_REQUESTS", c->argv[5]->ptr)) { + if (strcasecmp("tokens", c->argv[5]->ptr)) { addReplyErrorObject(c, shared.syntaxerr); return; } if (c->argc == 6) { - addReplyError(c, "Missing NUM_REQUESTS value"); + addReplyError(c, "Missing TOKENS value"); return; } - if (getRangeLongFromObjectOrReply(c, c->argv[6], 1, LONG_MAX, &num_requests, NULL) != C_OK) { + if (getRangeLongFromObjectOrReply(c, c->argv[6], 1, LONG_MAX, &num_tokens, NULL) != C_OK) { return; } } @@ -158,18 +158,18 @@ void gcraCommand(client *c) { * Even if emission_interval_us becomes less than 1us, we assume it's min * 1ms. The API is already in seconds granularity so it is expected the user * won't need a submicrosecond accuracy. */ - long long emission_interval_us = (long long)(period_us / requests_per_period + 0.5); + long long emission_interval_us = (long long)(period_us / tokens_per_period + 0.5); if (unlikely(emission_interval_us == 0)) emission_interval_us = 1; /* overflow checks. In normal circumstances we shouldn't get these but the * user may have wrongfully specified very large values. * Note that all values are positive. */ - if (emission_interval_us > LLONG_MAX / num_requests) { - addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, requests_per_period and num_requests would cause an overflow"); + if (emission_interval_us > LLONG_MAX / num_tokens) { + addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, tokens_per_period and TOKENS would cause an overflow"); return; } if (emission_interval_us > LLONG_MAX / max_burst) { - addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, requests_per_period and max_burst would cause an overflow"); + addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, tokens_per_period and max_burst would cause an overflow"); return; } @@ -180,11 +180,11 @@ void gcraCommand(client *c) { /* If a request is allowed the next TaT is after an emission_interval_us time. * Hence for multiple requests we multiple by their number. */ - long long increment_us = emission_interval_us * num_requests; + long long increment_us = emission_interval_us * num_tokens; long long base_us = (now > tat_us) ? now : tat_us; if (LLONG_MAX - base_us < increment_us) { - addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, requests_per_period and num_requests would cause an overflow"); + addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, tokens_per_period and TOKENS would cause an overflow"); return; } new_tat_us = base_us + increment_us; diff --git a/src/keymeta.c b/src/keymeta.c index 530ea04930..ca68783071 100644 --- a/src/keymeta.c +++ b/src/keymeta.c @@ -593,7 +593,7 @@ int rdbSaveKeyMetadata(rio *rdb, robj *key, kvobj *kv, int dbid) { /* Call module's rdb_save callback */ RedisModuleIO io; moduleInitIOContext(&io, &pClass->entity, &payload_rio, key, dbid); - pClass->conf.rdb_save(&io, kv, pMeta); + pClass->conf.rdb_save(&io, NULL, pMeta); if (io.ctx) { moduleFreeContext(io.ctx); @@ -668,7 +668,7 @@ int keyMetaOnAof(rio *r, robj *key, kvobj *kv, int dbid) { { RedisModuleIO io; moduleInitIOContext(&io, &keyMetaClass[keyMetaId].entity, r, key, dbid); - keyMetaClass[keyMetaId].conf.aof_rewrite(&io, kv, meta); + keyMetaClass[keyMetaId].conf.aof_rewrite(&io, NULL, meta); if (io.ctx) { moduleFreeContext(io.ctx); zfree(io.ctx); diff --git a/src/keymeta.h b/src/keymeta.h index 43b4242313..722a4a0a6d 100644 --- a/src/keymeta.h +++ b/src/keymeta.h @@ -60,8 +60,8 @@ typedef int KeyMetaClassId; /* Index into redisServer.keyMetaClass[] */ /* RDB load callback: Return 1 to attach, 0 to skip, -1 on error */ typedef int (*KeyMetaLoadFunc)(RedisModuleIO *rdb, uint64_t *meta, int encver); -typedef void (*KeyMetaSaveFunc)(RedisModuleIO *rdb, void *value, uint64_t *meta); -typedef void (*KeyMetaAOFRewriteFunc)(RedisModuleIO *aof, void *value, uint64_t meta); +typedef void (*KeyMetaSaveFunc)(RedisModuleIO *rdb, void *reserved, uint64_t *meta); +typedef void (*KeyMetaAOFRewriteFunc)(RedisModuleIO *aof, void *reserved, uint64_t meta); typedef void (*KeyMetaFreeFunc)(const char *keyname, uint64_t meta); typedef int (*KeyMetaCopyFunc)(struct RedisModuleKeyOptCtx *ctx, uint64_t *meta); typedef int (*KeyMetaRenameFunc)(struct RedisModuleKeyOptCtx *ctx, uint64_t *meta); @@ -90,8 +90,8 @@ typedef struct KeyMetaClassConf { void (*unlink)(struct RedisModuleKeyOptCtx *ctx, uint64_t *meta); void (*free)(const char *keyname, uint64_t meta); int (*rdb_load)(struct RedisModuleIO *rdb, uint64_t *meta, int metaver); - void (*rdb_save)(struct RedisModuleIO *rdb, void *value, uint64_t *meta); - void (*aof_rewrite)(struct RedisModuleIO *aof, void *value, uint64_t meta); + void (*rdb_save)(struct RedisModuleIO *rdb, void *reserved, uint64_t *meta); + void (*aof_rewrite)(struct RedisModuleIO *aof, void *reserved, uint64_t meta); /****************************** TBD: ******************************/ int (*defrag) (struct RedisModuleDefragCtx *ctx, struct redisObject *key, uint64_t meta); diff --git a/src/listpack.c b/src/listpack.c index 4d0e72c431..5b37d2f7ba 100644 --- a/src/listpack.c +++ b/src/listpack.c @@ -1208,7 +1208,10 @@ unsigned char *lpBatchInsert(unsigned char *lp, unsigned char *p, int where, uint64_t old_listpack_bytes = lpGetTotalBytes(lp); uint64_t new_listpack_bytes = old_listpack_bytes + addedlen; - if (new_listpack_bytes > UINT32_MAX) return NULL; + if (new_listpack_bytes > UINT32_MAX) { + if (enc != tmp) lp_free(enc); + return NULL; + } /* Store the offset of the element 'p', so that we can obtain its * address again after a reallocation. */ @@ -1218,7 +1221,10 @@ unsigned char *lpBatchInsert(unsigned char *lp, unsigned char *p, int where, /* Realloc before: we need more room. */ if (new_listpack_bytes > old_listpack_bytes && new_listpack_bytes > lp_malloc_size(lp)) { - if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL; + if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) { + if (enc != tmp) lp_free(enc); + return NULL; + } dst = lp + poff; } diff --git a/src/module.c b/src/module.c index ff04842e42..e3971195b1 100644 --- a/src/module.c +++ b/src/module.c @@ -2528,7 +2528,7 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) { * RM_SetModuleOptions(). */ int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) { - kvobj *kv = lookupKeyReadWithFlags(ctx->client->db, keyname, LOOKUP_NOTOUCH); + kvobj *kv = lookupKeyReadWithFlags(ctx->client->db, keyname, LOOKUP_NOEFFECTS); keyModified(ctx->client,ctx->client->db,keyname,kv,1); return REDISMODULE_OK; } diff --git a/src/rdb.c b/src/rdb.c index bcae575025..e9429d28f9 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -4037,14 +4037,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin } /* Register streams with IDMP producers for cron-based expiration. */ - if (kv->type == OBJ_STREAM) { - stream *s = kv->ptr; - if (s->idmp_producers != NULL) { - robj *kobj = createStringObject(key, sdslen(key)); - if (dictAddRaw(db->stream_idmp_keys, kobj, NULL) == NULL) - decrRefCount(kobj); - } - } + if (kv->type == OBJ_STREAM) + streamKeyLoaded(db, &keyobj, kv); /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000); diff --git a/src/redismodule.h b/src/redismodule.h index 4e99682b2e..71579d3c39 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -1011,8 +1011,8 @@ typedef void (*RedisModuleOnUnblocked)(RedisModuleCtx *ctx, RedisModuleCallReply typedef int (*RedisModuleAuthCallback)(RedisModuleCtx *ctx, RedisModuleString *username, RedisModuleString *password, RedisModuleString **err); typedef int (*RedisModuleKeyMetaLoadFunc)(RedisModuleIO *rdb, uint64_t *meta, int encver); -typedef void (*RedisModuleKeyMetaSaveFunc)(RedisModuleIO *rdb, void *value, uint64_t *meta); -typedef void (*RedisModuleKeyMetaAOFRewriteFunc)(RedisModuleIO *aof, void *value, uint64_t meta); +typedef void (*RedisModuleKeyMetaSaveFunc)(RedisModuleIO *rdb, void *reserved, uint64_t *meta); +typedef void (*RedisModuleKeyMetaAOFRewriteFunc)(RedisModuleIO *aof, void *reserved, uint64_t meta); typedef void (*RedisModuleKeyMetaFreeFunc)(const char *keyname, uint64_t meta); typedef int (*RedisModuleKeyMetaCopyFunc)(RedisModuleKeyOptCtx *ctx, uint64_t *meta); typedef int (*RedisModuleKeyMetaRenameFunc)(RedisModuleKeyOptCtx *ctx, uint64_t *meta); diff --git a/src/sds.c b/src/sds.c index 06f16cf80f..0a940e13d8 100644 --- a/src/sds.c +++ b/src/sds.c @@ -1507,27 +1507,42 @@ int sdsTest(int argc, char **argv, int flags) { x = sdsResize(x, 200, 1); test_cond("sdsresize() expand len", sdslen(x) == 40); test_cond("sdsresize() expand strlen", strlen(x) == 40); - test_cond("sdsresize() expand alloc", sdsalloc(x) == 200); +#if defined(USE_JEMALLOC) + /* 224 - hdrlen(3) - 1(\0) */ + test_cond("sdsresize() expand alloc", sdsalloc(x) == 220); +#endif /* Test sdsresize - trim free space */ x = sdsResize(x, 80, 1); test_cond("sdsresize() shrink len", sdslen(x) == 40); test_cond("sdsresize() shrink strlen", strlen(x) == 40); - test_cond("sdsresize() shrink alloc", sdsalloc(x) == 80); +#if defined(USE_JEMALLOC) + /* 96 - hdrlen(3) - 1(\0) */ + test_cond("sdsresize() shrink alloc", sdsalloc(x) == 92); +#endif /* Test sdsresize - crop used space */ x = sdsResize(x, 30, 1); test_cond("sdsresize() crop len", sdslen(x) == 30); test_cond("sdsresize() crop strlen", strlen(x) == 30); - test_cond("sdsresize() crop alloc", sdsalloc(x) == 30); +#if defined(USE_JEMALLOC) + /* 40 - hdrlen(3) - 1(\0) */ + test_cond("sdsresize() crop alloc", sdsalloc(x) == 36); +#endif /* Test sdsresize - extend to different class */ x = sdsResize(x, 400, 1); test_cond("sdsresize() expand len", sdslen(x) == 30); test_cond("sdsresize() expand strlen", strlen(x) == 30); - test_cond("sdsresize() expand alloc", sdsalloc(x) == 400); +#if defined(USE_JEMALLOC) + /* 448 - hdrlen(5) - 1(\0) */ + test_cond("sdsresize() expand alloc", sdsalloc(x) == 442); +#endif /* Test sdsresize - shrink to different class */ x = sdsResize(x, 4, 1); test_cond("sdsresize() crop len", sdslen(x) == 4); test_cond("sdsresize() crop strlen", strlen(x) == 4); +#if defined(USE_JEMALLOC) + /* 8 - hdrlen(3) - 1(\0) */ test_cond("sdsresize() crop alloc", sdsalloc(x) == 4); +#endif sdsfree(x); { /* Test adjustTypeIfNeeded() */ diff --git a/src/server.c b/src/server.c index 5ba1ec9438..8eb358fb17 100644 --- a/src/server.c +++ b/src/server.c @@ -7783,7 +7783,8 @@ int zsetTest(int argc, char **argv, int flags); struct redisTest { char *name; redisTestProc *proc; - int failed; + int test_count; + int passed_count; } redisTests[] = { {"ziplist", ziplistTest}, {"quicklist", quicklistTest}, @@ -7837,32 +7838,40 @@ int main(int argc, char **argv) { if (!strcasecmp(argv[2], "all")) { int numtests = sizeof(redisTests)/sizeof(struct redisTest); - for (j = 0; j < numtests; j++) { - redisTests[j].failed = (redisTests[j].proc(argc,argv,flags) != 0); - } - - /* Report tests result */ int failed_num = 0; for (j = 0; j < numtests; j++) { - if (redisTests[j].failed) { + int before_total = __test_num; + int before_failed = __failed_tests; + redisTests[j].proc(argc,argv,flags); + redisTests[j].test_count = __test_num - before_total; + redisTests[j].passed_count = redisTests[j].test_count - (__failed_tests - before_failed); + if (redisTests[j].passed_count < redisTests[j].test_count) failed_num++; - printf("[failed] Test - %s\n", redisTests[j].name); - } else { - printf("[ok] Test - %s\n", redisTests[j].name); - } } - printf("%d tests, %d passed, %d failed\n", numtests, - numtests-failed_num, failed_num); + printf("\n========== Test Suite Summary ==========\n\n"); + for (j = 0; j < numtests; j++) { + int failed = redisTests[j].passed_count < redisTests[j].test_count; + printf(" %s %-15s (%d/%d passed)%s\n", + failed ? "\033[31m[failed]" : "\033[32m[ok] \033[0m", + redisTests[j].name, + redisTests[j].passed_count, redisTests[j].test_count, + failed ? "\033[0m" : ""); + } - return failed_num == 0 ? 0 : 1; + printf("\n Test Groups: %s%d passed\033[0m, %s%d failed\033[0m, %d total\n", + failed_num ? "" : "\033[32m", numtests-failed_num, + failed_num ? "\033[31m" : "", failed_num, numtests); + + test_report(); } else { redisTestProc *proc = getTestProcByName(argv[2]); if (!proc) return -1; /* test not found */ - return proc(argc,argv,flags); + proc(argc,argv,flags); + test_report(); } - return 0; + return __failed_tests ? 1 : 0; } #endif diff --git a/src/server.h b/src/server.h index 3fd26a51ce..b211339d00 100644 --- a/src/server.h +++ b/src/server.h @@ -4026,6 +4026,7 @@ int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int pfmergeGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); diff --git a/src/stream.h b/src/stream.h index 028e374353..da9d41a695 100644 --- a/src/stream.h +++ b/src/stream.h @@ -193,6 +193,8 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); int64_t streamTrimByLength(stream *s, long long maxlen, int approx); int64_t streamTrimByID(stream *s, streamID minid, int approx); int streamEntryExists(stream *s, streamID *id); +void streamKeyLoaded(redisDb *db, robj *key, robj *val); +void streamKeyRemoved(redisDb *db, robj *key, robj *val); listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key); diff --git a/src/t_stream.c b/src/t_stream.c index d263f26720..ba7ca2968b 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -5704,6 +5704,27 @@ static void trackStreamIdmpEntries(client *c, robj *key) { } } +/* To be used when a stream key was loaded into ram, re-register it in stream_idmp_keys if needed */ +void streamKeyLoaded(redisDb *db, robj *key, robj *val) { + stream *s = val->ptr; + if (s->idmp_producers != NULL) { + robj *tracked_key = key; + if (key->refcount == OBJ_STATIC_REFCOUNT) + tracked_key = createStringObject(key->ptr, sdslen(key->ptr)); + if (dictAddRaw(db->stream_idmp_keys, tracked_key, NULL)) { + incrRefCount(tracked_key); + } + if (tracked_key != key) + decrRefCount(tracked_key); + } +} + +/* To be used when a steam key was removed from ram, un-redigster from stream_idmp_keys if needed */ +void streamKeyRemoved(redisDb *db, robj *key, robj *val) { + UNUSED(val); + dictDelete(db->stream_idmp_keys, key); +} + /* Clean up expired idempotency entries from tracked streams. This function * is invoked regularly from serverCron() to remove expired entries * from the idmp_dict of streams that have idempotency tracking enabled, @@ -5750,6 +5771,7 @@ void handleExpiredIdmpEntries(void) { } /* Iterate through all producers and remove expired entries */ + int modified = 0; raxIterator ri; raxStart(&ri, s->idmp_producers); raxSeek(&ri, "^", NULL, 0); @@ -5769,6 +5791,7 @@ void handleExpiredIdmpEntries(void) { } /* Free the entry */ idmpEntryFree(entry, &s->alloc_size); + modified = 1; } else { break; } @@ -5779,10 +5802,14 @@ void handleExpiredIdmpEntries(void) { raxRemove(s->idmp_producers, ri.key, ri.key_len, NULL); idmpProducerFree(producer, &s->alloc_size); raxSeek(&ri, ">=", ri.key, ri.key_len); + modified = 1; } } raxStop(&ri); + if (modified) + keyModified(NULL, db, key, kv, 0); + /* If no producers remain, free the entire rax tree */ if (raxSize(s->idmp_producers) == 0) { raxFree(s->idmp_producers); diff --git a/src/t_zset.c b/src/t_zset.c index 3ce602e1dc..346bcd38c9 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -2793,7 +2793,10 @@ static void zdiffAlgorithm2(zsetopsrc *src, long setnum, zset *dstzset, size_t * /* Exit if result set is empty as any additional removal * of elements will have no effect. */ - if (cardinality == 0) break; + if (cardinality == 0) { + zuiDiscardDirtyValue(&zval); + break; + } } zuiClearIterator(&src[j]); diff --git a/src/testhelp.h b/src/testhelp.h index aeae372dd6..58fe2f8fa7 100644 --- a/src/testhelp.h +++ b/src/testhelp.h @@ -30,14 +30,17 @@ extern int __test_num; #define test_cond(descr,_c) do { \ __test_num++; printf("%d - %s: ", __test_num, descr); \ - if(_c) printf("PASSED\n"); else {printf("FAILED\n"); __failed_tests++;} \ + if(_c) printf("\033[32mPASSED\033[0m\n"); else {printf("\033[31mFAILED\033[0m\n"); __failed_tests++;} \ } while(0) #define test_report() do { \ - printf("%d tests, %d passed, %d failed\n", __test_num, \ - __test_num-__failed_tests, __failed_tests); \ if (__failed_tests) { \ - printf("=== WARNING === We have failed tests here...\n"); \ + printf(" Tests: %d passed, \033[31m%d failed\033[0m, %d total\n", \ + __test_num-__failed_tests, __failed_tests, __test_num); \ + printf("\033[31m=== WARNING === We have failed tests here...\033[0m\n"); \ exit(1); \ + } else { \ + printf(" Tests: \033[32m%d passed\033[0m, %d failed, %d total\n", \ + __test_num-__failed_tests, __failed_tests, __test_num); \ } \ } while(0) diff --git a/tests/cluster/tests/17-diskless-load-swapdb.tcl b/tests/cluster/tests/17-diskless-load-swapdb.tcl index cb81b9fdbd..b25db39309 100644 --- a/tests/cluster/tests/17-diskless-load-swapdb.tcl +++ b/tests/cluster/tests/17-diskless-load-swapdb.tcl @@ -54,9 +54,12 @@ test "Main db not affected when fail to diskless load" { set rd [redis_deferring_client redis $master_id] for {set j 0} {$j < $num} {incr j} { $rd set $j $value - } - for {set j 0} {$j < $num} {incr j} { - $rd read + + if {($j + 1) % 500 == 0} { + for {set i 0} {$i < 500} {incr i} { + $rd read + } + } } # Start the replica again diff --git a/tests/cluster/tests/26-pubsubshard.tcl b/tests/cluster/tests/26-pubsubshard.tcl index 34939acf7c..6c3ee14ae0 100644 --- a/tests/cluster/tests/26-pubsubshard.tcl +++ b/tests/cluster/tests/26-pubsubshard.tcl @@ -123,6 +123,7 @@ test "PUBSUB channels/shardchannels" { assert_equal {3} [llength [$publishclient pubsub shardchannels]] sunsubscribe $subscribeclient + $subscribeclient read set channel_list [$publishclient pubsub shardchannels] assert_equal {2} [llength $channel_list] assert {[lsearch -exact $channel_list "\{channel.0\}2"] >= 0} diff --git a/tests/helpers/gen_write_load.tcl b/tests/helpers/gen_write_load.tcl index 7b4975c61a..60d954e5db 100644 --- a/tests/helpers/gen_write_load.tcl +++ b/tests/helpers/gen_write_load.tcl @@ -22,13 +22,18 @@ proc gen_write_load {host port seconds tls {key ""} {size 0} {sleep 0}} { set start_time [clock seconds] set r [redis $host $port 1 $tls] $r client setname LOAD_HANDLER - catch {$r select 9} ;# select 9 will fail in cluster mode + $r read + catch { + $r select 9 + $r read + } ;# select 9 will fail in cluster mode # fixed size value if {$size != 0} { set value [string repeat "x" $size] } + set count 0 while 1 { if {$size == 0} { set value [expr rand()] @@ -39,13 +44,27 @@ proc gen_write_load {host port seconds tls {key ""} {size 0} {sleep 0}} { } else { $r set $key $value } + + incr count + if {$count % 500 == 0} { + for {set i 0} {$i < 500} {incr i} { + $r read + } + } + if {[clock seconds]-$start_time > $seconds} { - exit 0 + break } if {$sleep ne 0} { after $sleep } } + + # Read remaining replies + for {set i 0} {$i < $count} {incr i} { + $r read + } + exit 0 } gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3] [lindex $argv 4] [lindex $argv 5] [lindex $argv 6] diff --git a/tests/modules/test_keymeta.c b/tests/modules/test_keymeta.c index cc35549c20..8a80726fb2 100644 --- a/tests/modules/test_keymeta.c +++ b/tests/modules/test_keymeta.c @@ -173,11 +173,11 @@ static int KeyMetaMoveDiscardCallback(RedisModuleKeyOptCtx *ctx, uint64_t *meta) * * Parameters: * - rdb: RedisModuleIO context for writing to RDB - * - value: The kvobj (key-value object) - not used in this implementation + * - reserved: Reserved for future use * - meta: Pointer to the 8-byte metadata value (pointer to our string) */ -static void KeyMetaRDBSaveCallback(RedisModuleIO *rdb, void *value, uint64_t *meta) { - REDISMODULE_NOT_USED(value); +static void KeyMetaRDBSaveCallback(RedisModuleIO *rdb, void *reserved, uint64_t *meta) { + REDISMODULE_NOT_USED(reserved); /* If metadata is NULL (reset_value), don't save anything */ if (*meta == 0) return; @@ -252,12 +252,12 @@ static int KeyMetaRDBLoadCallback(RedisModuleIO *rdb, uint64_t *meta, int encver * * Parameters: * - aof: RedisModuleIO context for writing to AOF - * - value: The kvobj (key-value object) - not used in this implementation + * - reserved: Reserved for future use * - meta: The 8-byte metadata value (pointer to our string) * - class_id: The class ID for this metadata */ -static void KeyMetaAOFRewriteCallback_Class(RedisModuleIO *aof, void *value, uint64_t meta, RedisModuleKeyMetaClassId class_id) { - REDISMODULE_NOT_USED(value); +static void KeyMetaAOFRewriteCallback_Class(RedisModuleIO *aof, void *reserved, uint64_t meta, RedisModuleKeyMetaClassId class_id) { + REDISMODULE_NOT_USED(reserved); /* If metadata is NULL (reset_value), don't emit anything */ if (meta == 0) return; @@ -289,32 +289,32 @@ static void KeyMetaAOFRewriteCallback_Class(RedisModuleIO *aof, void *value, uin /* Individual AOF rewrite callbacks for each class (1-7) * Each callback wraps the common implementation with its specific class ID */ -static void KeyMetaAOFRewriteCb1(RedisModuleIO *aof, void *value, uint64_t meta) { - KeyMetaAOFRewriteCallback_Class(aof, value, meta, 1); +static void KeyMetaAOFRewriteCb1(RedisModuleIO *aof, void *reserved, uint64_t meta) { + KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 1); } -static void KeyMetaAOFRewriteCb2(RedisModuleIO *aof, void *value, uint64_t meta) { - KeyMetaAOFRewriteCallback_Class(aof, value, meta, 2); +static void KeyMetaAOFRewriteCb2(RedisModuleIO *aof, void *reserved, uint64_t meta) { + KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 2); } -static void KeyMetaAOFRewriteCb3(RedisModuleIO *aof, void *value, uint64_t meta) { - KeyMetaAOFRewriteCallback_Class(aof, value, meta, 3); +static void KeyMetaAOFRewriteCb3(RedisModuleIO *aof, void *reserved, uint64_t meta) { + KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 3); } -static void KeyMetaAOFRewriteCb4(RedisModuleIO *aof, void *value, uint64_t meta) { - KeyMetaAOFRewriteCallback_Class(aof, value, meta, 4); +static void KeyMetaAOFRewriteCb4(RedisModuleIO *aof, void *reserved, uint64_t meta) { + KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 4); } -static void KeyMetaAOFRewriteCb5(RedisModuleIO *aof, void *value, uint64_t meta) { - KeyMetaAOFRewriteCallback_Class(aof, value, meta, 5); +static void KeyMetaAOFRewriteCb5(RedisModuleIO *aof, void *reserved, uint64_t meta) { + KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 5); } -static void KeyMetaAOFRewriteCb6(RedisModuleIO *aof, void *value, uint64_t meta) { - KeyMetaAOFRewriteCallback_Class(aof, value, meta, 6); +static void KeyMetaAOFRewriteCb6(RedisModuleIO *aof, void *reserved, uint64_t meta) { + KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 6); } -static void KeyMetaAOFRewriteCb7(RedisModuleIO *aof, void *value, uint64_t meta) { - KeyMetaAOFRewriteCallback_Class(aof, value, meta, 7); +static void KeyMetaAOFRewriteCb7(RedisModuleIO *aof, void *reserved, uint64_t meta) { + KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 7); } /* KEYMETA.REGISTER <4-char-id> [KEEPONCOPY:KEEPONRENAME:UNLINKFREE:ALLOWIGNORE:NORDBLOAD:NORDBSAVE] */ diff --git a/tests/unit/gcra.tcl b/tests/unit/gcra.tcl index aa9ed726d9..e11101484f 100644 --- a/tests/unit/gcra.tcl +++ b/tests/unit/gcra.tcl @@ -33,13 +33,13 @@ start_server {tags {"gcra" "external:skip"}} { assert_match "*not a valid float*" $err # tokens (optional) must be >= 1 - catch {r gcra mykey 10 5 10 NUM_REQUESTS} err - assert_match "*Missing NUM_REQUESTS value*" $err - catch {r gcra mykey 10 5 10 NUM_REQUESTS 0} err + catch {r gcra mykey 10 5 10 TOKENS} err + assert_match "*Missing TOKENS value*" $err + catch {r gcra mykey 10 5 10 TOKENS 0} err assert_match "*out of range*" $err - catch {r gcra mykey 10 5 10 NUM_REQUESTS -1} err + catch {r gcra mykey 10 5 10 TOKENS -1} err assert_match "*out of range*" $err - catch {r gcra mykey 10 5 10 NUM_REQUESTS notanumber} err + catch {r gcra mykey 10 5 10 TOKENS notanumber} err assert_match "*not an integer*" $err # Valid arguments with default tokens @@ -53,7 +53,7 @@ start_server {tags {"gcra" "external:skip"}} { # Valid arguments with explicit tokens r del mykey - set result [r gcra mykey 10 5 60 NUM_REQUESTS 2] + set result [r gcra mykey 10 5 60 TOKENS 2] assert_equal 5 [llength $result] assert_equal 11 [lindex $result 1] @@ -130,7 +130,7 @@ start_server {tags {"gcra" "external:skip"}} { r del mykey # Consume 3 tokens from fresh state - set result2 [r gcra mykey 5 1 60 NUM_REQUESTS 3] + set result2 [r gcra mykey 5 1 60 TOKENS 3] set avail2 [lindex $result2 2] assert_equal 3 $avail2 } @@ -168,7 +168,7 @@ start_server {tags {"gcra" "external:skip"}} { r del mykey r gcra mykey 5 1 60 - set result [r gcra mykey 5 1 60 NUM_REQUESTS 4] + set result [r gcra mykey 5 1 60 TOKENS 4] set full_burst_after [lindex $result 4] assert {$full_burst_after >= 299} } @@ -216,7 +216,7 @@ start_server {tags {"gcra" "external:skip"}} { test {GCRA - overflow} { r del mykey - catch {r gcra mykey 1 1 86400 NUM_REQUESTS 200000000} err + catch {r gcra mykey 1 1 86400 TOKENS 200000000} err assert_match "*would cause an overflow*" $err r del mykey @@ -224,7 +224,7 @@ start_server {tags {"gcra" "external:skip"}} { assert_match "*would cause an overflow*" $err r del mykey - catch {r gcra mykey 1 1 2147483647 NUM_REQUESTS 2147483647} err + catch {r gcra mykey 1 1 2147483647 TOKENS 2147483647} err assert_match "*would cause an overflow*" $err } } @@ -255,7 +255,7 @@ start_server {tags {"gcra repl" "external:skip"}} { assert_equal [lsearch -glob $cmdinfo "cmdstat_set:*"] -1 $master del mykey - $master gcra mykey 2 1 1000 NUM_REQUESTS 2 + $master gcra mykey 2 1 1000 TOKENS 2 wait_for_ofs_sync $master $replica diff --git a/tests/unit/introspection-2.tcl b/tests/unit/introspection-2.tcl index 1e98fdb6a0..32932ef1a8 100644 --- a/tests/unit/introspection-2.tcl +++ b/tests/unit/introspection-2.tcl @@ -189,6 +189,17 @@ start_server {tags {"introspection"}} { assert_equal {key1 key2} [r command getkeys lcs key1 key2] } + test {COMMAND GETKEYS PFMERGE with and without source keys} { + # dest + sources: both key specs yield keys + assert_equal {dest src1 src2} [r command getkeys PFMERGE dest src1 src2] + + # dest only, no source keys: spec[1] yields empty range (last < first). + # Without pfmergeGetKeys this returned "Invalid arguments" because + # getKeysUsingKeySpecs treated the empty range as invalid_spec, + # discarding the dest key found by spec[0]. + assert_equal {dest} [r command getkeys PFMERGE dest] + } + test {COMMAND GETKEYS MORE THAN 256 KEYS} { set all_keys [list] set numkeys 260 diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 6be0ed69dc..0ab12c6c7e 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -24,9 +24,12 @@ proc test_memory_efficiency {range} { incr written [string length $key] incr written [string length $val] incr written 2 ;# A separator is the minimum to store key-value data. - } - for {set j 0} {$j < 10000} {incr j} { - $rd read ; # Discard replies + + if {($j + 1) % 500 == 0} { + for {set i 0} {$i < 500} {incr i} { + $rd read ; # Discard replies + } + } } set current_mem [s used_memory] @@ -352,7 +355,7 @@ run_solo {defrag} { # create big keys with 10k items # Use batching to avoid TCP deadlock set rd [redis_deferring_client] - set batch_size 1000 + set batch_size 100 for {set j 0} {$j < 10000} {incr j} { $rd hset bighash $j [concat "asdfasdfasdf" $j] $rd lpush biglist [concat "asdfasdfasdf" $j] @@ -937,7 +940,7 @@ run_solo {defrag} { $rd lpush biglist2 $val incr count - discard_replies_every $rd $count 10000 20000 + discard_replies_every $rd $count 1000 2000 } # create some fragmentation diff --git a/tests/unit/type/set.tcl b/tests/unit/type/set.tcl index ae315844d6..5c38056433 100644 --- a/tests/unit/type/set.tcl +++ b/tests/unit/type/set.tcl @@ -1027,10 +1027,18 @@ foreach type {single multiple single_multiple} { } } r deferred 1 + set count 0 foreach m $members { r srem $myset $m + incr count + if {$count == 500} { + for {set i 0} {$i < 500} {incr i} { + r read + } + set count 0 + } } - foreach m $members { + for {set i 0} {$i < $count} {incr i} { r read } r deferred 0 diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 9f3e1f3342..ad9483b2df 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -1114,6 +1114,18 @@ start_server {tags {"zset"}} { assert_equal {a 1 e 5} [r zrange zsete{t} 0 -1 withscores] } + test "ZDIFF algorithm 2 empty result early exit - $encoding" { + # Force algorithm 2 by inflating setnum with non-existing keys. + # algo_one_work = len(src[0]) * setnum / 2 = 2 * 10 / 2 = 10 + # algo_two_work = 2 + 2 + 0*8 = 4 + # algo_one (10) > algo_two (4) -> algorithm 2 is selected + r del zseta{t} zsetb{t} zsetc{t} + r zadd zseta{t} 1 a 2 b + r zadd zsetb{t} 1 a 2 b + assert_equal 0 [r zdiffstore zsetc{t} 10 zseta{t} zsetb{t} nx1{t} nx2{t} nx3{t} nx4{t} nx5{t} nx6{t} nx7{t} nx8{t}] + assert_equal {} [r zrange zsetc{t} 0 -1 withscores] + } + test "ZDIFF fuzzing - $encoding" { for {set j 0} {$j < 100} {incr j} { unset -nocomplain s