Merge branch 'unstable' into udi/RED-188967-OSS-backport-active-clients-stats-renamed-counters

This commit is contained in:
debing.sun 2026-04-02 16:57:58 +08:00
commit f0646087be
34 changed files with 329 additions and 144 deletions

View file

@ -32,7 +32,7 @@ clean_environment: uninstall-rust
# Keep all of the Rust stuff in one place
install-rust:
ifeq ($(INSTALL_RUST_TOOLCHAIN),yes)
@RUST_VERSION=1.93.1; \
@RUST_VERSION=1.94.0; \
ARCH="$$(uname -m)"; \
if ldd --version 2>&1 | grep -q musl; then LIBC_TYPE="musl"; else LIBC_TYPE="gnu"; fi; \
echo "Detected architecture: $${ARCH} and libc: $${LIBC_TYPE}"; \
@ -40,18 +40,18 @@ ifeq ($(INSTALL_RUST_TOOLCHAIN),yes)
'x86_64') \
if [ "$${LIBC_TYPE}" = "musl" ]; then \
RUST_INSTALLER="rust-$${RUST_VERSION}-x86_64-unknown-linux-musl"; \
RUST_SHA256="6a57ddfffa77bfa97ab325586b08fc1e96c3acb82ecc9f554cdb2ef748466ef2"; \
RUST_SHA256="9a358120ce1491a4d5b7f71a41e4e97b380b5db5d4ec31f7110f5b3090bd3d55"; \
else \
RUST_INSTALLER="rust-$${RUST_VERSION}-x86_64-unknown-linux-gnu"; \
RUST_SHA256="518872275a3648f735471d7add854d39171c5a6b17f423c4d7f931f52120c2af"; \
RUST_SHA256="e8fa4185f3ef6ae32725ff638b1ecdbff28f5d651dc0b3111e2539350d03b15a"; \
fi ;; \
'aarch64') \
if [ "$${LIBC_TYPE}" = "musl" ]; then \
RUST_INSTALLER="rust-$${RUST_VERSION}-aarch64-unknown-linux-musl"; \
RUST_SHA256="34f0ffb0cd3e334aeae344daae09984f951eeb842386406d4bdf11cd0b4c2b36"; \
RUST_SHA256="008b3f0fc4175c956ecbfa4e0c48865ec3f953741b2926e75e8ded7e3adfdb19"; \
else \
RUST_INSTALLER="rust-$${RUST_VERSION}-aarch64-unknown-linux-gnu"; \
RUST_SHA256="701d55b62286bed013ceb2393ff7687d0953205605afaa15c62e2cd18024c32c"; \
RUST_SHA256="c6fd6d1c925ed986df3b2c0b89bbc90ce15afb62e4d522a054e7d50c856b3c1a"; \
fi ;; \
*) echo >&2 "Unsupported architecture: '$${ARCH}'"; exit 1 ;; \
esac; \

View file

@ -0,0 +1,19 @@
from test import TestCase
class VSIMDuplicateFilterLeak(TestCase):
def getname(self):
return "[regression] VSIM duplicate FILTER should not leak memory"
def test(self):
self.redis.execute_command('VADD', self.test_key, 'VALUES', 3, 0.5774, 0.5774, 0.5774, 'elem1')
self.redis.execute_command('VADD', self.test_key, 'VALUES', 3, 0.7071, 0.7071, 0.0, 'elem2')
self.redis.execute_command('VSETATTR', self.test_key, 'elem1', '{"a": 1, "b": 2}')
self.redis.execute_command('VSETATTR', self.test_key, 'elem2', '{"a": 2, "b": 3}')
# Duplicate FILTER: before the fix the first exprstate was
# overwritten without exprFree(), leaking ~760 bytes per call.
# Under ASAN/valgrind this shows up as a leak at server exit.
for _ in range(100):
self.redis.execute_command(
'VSIM', self.test_key, 'VALUES', 3, 0.5774, 0.5774, 0.5774,
'FILTER', '.a == 1', 'FILTER', '.b >= 1')

View file

@ -0,0 +1,32 @@
from test import TestCase
import redis as redis_module
class VSIMFilterLeakOnOptionError(TestCase):
def getname(self):
return "[regression] VSIM FILTER expr freed on option parse error"
def test(self):
self.redis.execute_command('VADD', self.test_key, 'VALUES', 3, 1, 0, 0, 'elem1')
# Valid FILTER followed by invalid option values. Before the fix,
# error paths freed vec but not filter_expr, leaking the compiled
# exprstate. Under ASAN/valgrind this shows up at server exit.
error_cmds = [
# invalid COUNT (0)
['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'COUNT', 0],
# invalid EF (0)
['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'EF', 0],
# invalid EPSILON (0)
['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'EPSILON', 0],
# invalid FILTER-EF (0)
['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'FILTER-EF', 0],
# unknown option
['VSIM', self.test_key, 'VALUES', 3, 0, 0, 0, 'FILTER', '.a > 0', 'BADOPT', 1],
]
for cmd in error_cmds:
for _ in range(20):
try:
self.redis.execute_command(*cmd)
except redis_module.exceptions.ResponseError:
pass

View file

@ -1064,6 +1064,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
!= REDISMODULE_OK || count <= 0)
{
RedisModule_Free(vec);
if (filter_expr) exprFree(filter_expr);
return RedisModule_ReplyWithError(ctx, "ERR invalid COUNT");
}
j += 2;
@ -1072,6 +1073,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_OK || epsilon <= 0)
{
RedisModule_Free(vec);
if (filter_expr) exprFree(filter_expr);
return RedisModule_ReplyWithError(ctx, "ERR invalid EPSILON");
}
j += 2;
@ -1080,6 +1082,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_OK || ef <= 0 || ef > 1000000)
{
RedisModule_Free(vec);
if (filter_expr) exprFree(filter_expr);
return RedisModule_ReplyWithError(ctx, "ERR invalid EF");
}
j += 2;
@ -1088,6 +1091,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_OK || filter_ef <= 0)
{
RedisModule_Free(vec);
if (filter_expr) exprFree(filter_expr);
return RedisModule_ReplyWithError(ctx, "ERR invalid FILTER-EF");
}
j += 2;
@ -1096,6 +1100,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
size_t exprlen;
char *exprstr = (char*)RedisModule_StringPtrLen(exprarg,&exprlen);
int errpos;
if (filter_expr) exprFree(filter_expr);
filter_expr = exprCompile(exprstr,&errpos);
if (filter_expr == NULL) {
if ((size_t)errpos >= exprlen) errpos = 0;
@ -1107,6 +1112,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
j += 2;
} else {
RedisModule_Free(vec);
if (filter_expr) exprFree(filter_expr);
return RedisModule_ReplyWithError(ctx,
"ERR syntax error in VSIM command");
}

View file

@ -60,4 +60,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/internalsecret \
--single unit/moduleapi/configaccess \
--single unit/moduleapi/keymeta \
--single unit/moduleapi/ksn_notify_side_effect \
"${@}"

View file

@ -306,13 +306,8 @@ void restoreCommand(client *c) {
estoreAdd(c->db->subexpires, getKeySlot(key->ptr), kv, minExpiredField);
}
if (kvtype == OBJ_STREAM) {
stream *s = kv->ptr;
if (s->idmp_producers != NULL) {
if (dictAdd(c->db->stream_idmp_keys, key, NULL) == DICT_OK)
incrRefCount(key);
}
}
if (kvtype == OBJ_STREAM)
streamKeyLoaded(c->db, key, kv);
if (ttl) {
if (!absttl) {

View file

@ -11062,9 +11062,9 @@ keySpec GCRA_Keyspecs[1] = {
struct COMMAND_ARG GCRA_Args[] = {
{MAKE_ARG("key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("max-burst",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("requests-per-period",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("tokens-per-period",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("period",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("count",ARG_TYPE_INTEGER,-1,"NUM_REQUESTS",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("count",ARG_TYPE_INTEGER,-1,"TOKENS",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
};
/********** GET ********************/
@ -11839,7 +11839,7 @@ struct COMMAND_STRUCT redisCommandTable[] = {
{MAKE_CMD("pfadd","Adds elements to a HyperLogLog key. Creates the key if it doesn't exist.","O(1) to add every element.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFADD_History,0,PFADD_Tips,0,pfaddCommand,-2,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_HYPERLOGLOG,PFADD_Keyspecs,1,NULL,2),.args=PFADD_Args},
{MAKE_CMD("pfcount","Returns the approximated cardinality of the set(s) observed by the HyperLogLog key(s).","O(1) with a very small average constant time when called with a single key. O(N) with N being the number of keys, and much bigger constant times, when called with multiple keys.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFCOUNT_History,0,PFCOUNT_Tips,0,pfcountCommand,-2,CMD_READONLY|CMD_MAY_REPLICATE,ACL_CATEGORY_HYPERLOGLOG,PFCOUNT_Keyspecs,1,NULL,1),.args=PFCOUNT_Args},
{MAKE_CMD("pfdebug","Internal commands for debugging HyperLogLog values.","N/A","2.8.9",CMD_DOC_SYSCMD,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFDEBUG_History,0,PFDEBUG_Tips,0,pfdebugCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_ADMIN,ACL_CATEGORY_HYPERLOGLOG,PFDEBUG_Keyspecs,1,NULL,2),.args=PFDEBUG_Args},
{MAKE_CMD("pfmerge","Merges one or more HyperLogLog values into a single key.","O(N) to merge N HyperLogLogs, but with high constant times.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFMERGE_History,0,PFMERGE_Tips,0,pfmergeCommand,-2,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_HYPERLOGLOG,PFMERGE_Keyspecs,2,NULL,2),.args=PFMERGE_Args},
{MAKE_CMD("pfmerge","Merges one or more HyperLogLog values into a single key.","O(N) to merge N HyperLogLogs, but with high constant times.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFMERGE_History,0,PFMERGE_Tips,0,pfmergeCommand,-2,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_HYPERLOGLOG,PFMERGE_Keyspecs,2,pfmergeGetKeys,2),.args=PFMERGE_Args},
{MAKE_CMD("pfselftest","An internal command for testing HyperLogLog values.","N/A","2.8.9",CMD_DOC_SYSCMD,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFSELFTEST_History,0,PFSELFTEST_Tips,0,pfselftestCommand,1,CMD_ADMIN,ACL_CATEGORY_HYPERLOGLOG,PFSELFTEST_Keyspecs,0,NULL,0)},
/* list */
{MAKE_CMD("blmove","Pops an element from a list, pushes it to another list and returns it. Blocks until an element is available otherwise. Deletes the list if the last element was moved.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"list",COMMAND_GROUP_LIST,BLMOVE_History,0,BLMOVE_Tips,0,blmoveCommand,6,CMD_WRITE|CMD_DENYOOM|CMD_BLOCKING,ACL_CATEGORY_LIST,BLMOVE_Keyspecs,2,NULL,5),.args=BLMOVE_Args},

View file

@ -47,15 +47,15 @@
},
{
"type": "integer",
"description": "Max request number: always equal to max_burst+1"
"description": "Max request tokens: always equal to max_burst+1"
},
{
"type": "integer",
"description": "Number of requests available immediately"
"description": "Number of tokens available immediately"
},
{
"type": "integer",
"description": "Retry after: seconds after which caller should retry. Always -1 if not limited"
"description": "Retry after: seconds after which the caller should retry. Always -1 if not limited"
},
{
"type": "integer",
@ -74,7 +74,7 @@
"type": "integer"
},
{
"name": "requests-per-period",
"name": "tokens-per-period",
"type": "integer"
},
{
@ -84,7 +84,7 @@
{
"name": "count",
"type": "integer",
"token": "NUM_REQUESTS",
"token": "TOKENS",
"optional": true
}
]

View file

@ -6,6 +6,7 @@
"since": "2.8.9",
"arity": -2,
"function": "pfmergeCommand",
"get_keys_function": "pfmergeGetKeys",
"command_flags": [
"WRITE",
"DENYOOM"

View file

@ -593,7 +593,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link
estoreRemove(db->subexpires, slot, old);
if (old->type == OBJ_STREAM)
dictDelete(db->stream_idmp_keys, key);
streamKeyRemoved(db, key, old);
long long oldExpire = getExpire(db, key->ptr, old);
@ -858,7 +858,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
/* If stream with IDMP tracking, remove it from stream_idmp_keys */
if (type == OBJ_STREAM)
dictDelete(db->stream_idmp_keys, key);
streamKeyRemoved(db, key, kv);
/* RM_StringDMA may call dbUnshareStringValue which may free kv, so we
* need to incr to retain kv */
@ -2243,13 +2243,11 @@ void renameGenericCommand(client *c, int nx) {
estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime);
/* Re-register stream IDMP tracking under the new key name. */
if (srctype == OBJ_STREAM && ((stream *)o->ptr)->idmp_producers != NULL) {
if (dictAdd(c->db->stream_idmp_keys, c->argv[2], NULL) == DICT_OK)
incrRefCount(c->argv[2]);
}
if (srctype == OBJ_STREAM)
streamKeyLoaded(c->db, c->argv[2], o);
keyModified(c,c->db,c->argv[1],NULL,1);
keyModified(c,c->db,c->argv[2],NULL,1); /* LRM already updated by dbAddInternal */
keyModified(c,c->db,c->argv[2],o,1);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
c->argv[1],c->db->id);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
@ -2343,13 +2341,11 @@ void moveCommand(client *c) {
estoreAdd(dst->subexpires, slot, kv, hashExpireTime);
/* Register stream IDMP tracking in the destination DB. */
if (kv->type == OBJ_STREAM && ((stream *)kv->ptr)->idmp_producers != NULL) {
if (dictAdd(dst->stream_idmp_keys, c->argv[1], NULL) == DICT_OK)
incrRefCount(c->argv[1]);
}
if (kv->type == OBJ_STREAM)
streamKeyLoaded(dst, c->argv[1], kv);
keyModified(c,src,c->argv[1],NULL,1);
keyModified(c,dst,c->argv[1],NULL,1); /* LRM already updated by dbAddInternal */
keyModified(c,dst,c->argv[1],kv,1);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"move_from",c->argv[1],src->id);
notifyKeyspaceEvent(NOTIFY_GENERIC,
@ -2469,16 +2465,11 @@ void copyCommand(client *c) {
estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire);
/* Register copied stream with IDMP producers for cron-based expiration. */
if (kvCopy->type == OBJ_STREAM) {
stream *s = kvCopy->ptr;
if (s->idmp_producers != NULL) {
if (dictAdd(dst->stream_idmp_keys, newkey, NULL) == DICT_OK)
incrRefCount(newkey);
}
}
if (kvCopy->type == OBJ_STREAM)
streamKeyLoaded(dst, newkey, kvCopy);
/* OK! key copied. Signal modification (LRM already updated by dbAddInternal) */
keyModified(c,dst,c->argv[2],NULL,1);
/* OK! key copied. Signal modification */
keyModified(c,dst,c->argv[2],kvCopy,1);
notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id);
/* `delete` implies the destination key was overwritten */
@ -3683,6 +3674,29 @@ int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *
return result->numkeys;
}
int pfmergeGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, numkeys;
keyReference *keys;
UNUSED(cmd);
UNUSED(argv);
numkeys = argc - 1; /* destkey + all sourcekeys */
keys = getKeysPrepareResult(result, numkeys);
/* destkey at argv[1] */
keys[0].pos = 1;
keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_INSERT;
/* sourcekeys at argv[2..argc-1], may be zero */
for (i = 2; i < argc; i++) {
keys[i - 1].pos = i;
keys[i - 1].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
}
result->numkeys = numkeys;
return result->numkeys;
}
/* This command declares incomplete keys, so the flags are correctly set for this function */
int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, j, num, first;

View file

@ -793,7 +793,7 @@ NULL
memcpy(val->ptr, buf, valsize<=buflen? valsize: buflen);
}
dbAdd(c->db, key, &val);
keyModified(c,c->db,key,NULL,1);
keyModified(c,c->db,key,val,1);
decrRefCount(key);
}
addReply(c,shared.ok);

View file

@ -65,21 +65,21 @@
*
* (ASCII art adapted from https://brandur.org/rate-limiting). */
/* GCRA key max_burst requests_per_period period [NUM_REQUESTS count]
/* GCRA key max_burst tokens_per_period period [TOKENS count]
*
* key: Key related to specific rate limiting case
* max_burst: Maximum requests allowed as burst (in addition to sustained rate)
* requests_per_period: Number of requests allowed per period
* max_burst: Maximum tokens allowed as burst (in addition to sustained rate)
* tokens_per_period: Number of tokens allowed per period
* period: Period in seconds for calculating sustained rate
* num_requests: Optional, cost of this request (default: 1)
* tokens: Optional, cost of this request (default: 1)
*/
void gcraCommand(client *c) {
robj *key = c->argv[1];
/* GCRA parameters */
long max_burst;
long requests_per_period;
long num_requests = 1;
long tokens_per_period;
long num_tokens = 1;
double period;
/* Variables used in the reply */
@ -98,7 +98,7 @@ void gcraCommand(client *c) {
}
if (likely(max_burst < LONG_MAX)) max_burst += 1;
if (getRangeLongFromObjectOrReply(c, c->argv[3], 1, LONG_MAX, &requests_per_period, NULL) != C_OK) {
if (getRangeLongFromObjectOrReply(c, c->argv[3], 1, LONG_MAX, &tokens_per_period, NULL) != C_OK) {
return;
}
@ -111,15 +111,15 @@ void gcraCommand(client *c) {
}
if (c->argc >= 6) {
if (strcasecmp("NUM_REQUESTS", c->argv[5]->ptr)) {
if (strcasecmp("tokens", c->argv[5]->ptr)) {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
if (c->argc == 6) {
addReplyError(c, "Missing NUM_REQUESTS value");
addReplyError(c, "Missing TOKENS value");
return;
}
if (getRangeLongFromObjectOrReply(c, c->argv[6], 1, LONG_MAX, &num_requests, NULL) != C_OK) {
if (getRangeLongFromObjectOrReply(c, c->argv[6], 1, LONG_MAX, &num_tokens, NULL) != C_OK) {
return;
}
}
@ -158,18 +158,18 @@ void gcraCommand(client *c) {
* Even if emission_interval_us becomes less than 1us, we assume it's min
* 1ms. The API is already in seconds granularity so it is expected the user
* won't need a submicrosecond accuracy. */
long long emission_interval_us = (long long)(period_us / requests_per_period + 0.5);
long long emission_interval_us = (long long)(period_us / tokens_per_period + 0.5);
if (unlikely(emission_interval_us == 0)) emission_interval_us = 1;
/* overflow checks. In normal circumstances we shouldn't get these but the
* user may have wrongfully specified very large values.
* Note that all values are positive. */
if (emission_interval_us > LLONG_MAX / num_requests) {
addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, requests_per_period and num_requests would cause an overflow");
if (emission_interval_us > LLONG_MAX / num_tokens) {
addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, tokens_per_period and TOKENS would cause an overflow");
return;
}
if (emission_interval_us > LLONG_MAX / max_burst) {
addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, requests_per_period and max_burst would cause an overflow");
addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, tokens_per_period and max_burst would cause an overflow");
return;
}
@ -180,11 +180,11 @@ void gcraCommand(client *c) {
/* If a request is allowed the next TaT is after an emission_interval_us time.
* Hence for multiple requests we multiple by their number. */
long long increment_us = emission_interval_us * num_requests;
long long increment_us = emission_interval_us * num_tokens;
long long base_us = (now > tat_us) ? now : tat_us;
if (LLONG_MAX - base_us < increment_us) {
addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, requests_per_period and num_requests would cause an overflow");
addReplyError(c, "GCRA limiting uses microsecond accuracy. Combination of period, tokens_per_period and TOKENS would cause an overflow");
return;
}
new_tat_us = base_us + increment_us;

View file

@ -593,7 +593,7 @@ int rdbSaveKeyMetadata(rio *rdb, robj *key, kvobj *kv, int dbid) {
/* Call module's rdb_save callback */
RedisModuleIO io;
moduleInitIOContext(&io, &pClass->entity, &payload_rio, key, dbid);
pClass->conf.rdb_save(&io, kv, pMeta);
pClass->conf.rdb_save(&io, NULL, pMeta);
if (io.ctx) {
moduleFreeContext(io.ctx);
@ -668,7 +668,7 @@ int keyMetaOnAof(rio *r, robj *key, kvobj *kv, int dbid) {
{
RedisModuleIO io;
moduleInitIOContext(&io, &keyMetaClass[keyMetaId].entity, r, key, dbid);
keyMetaClass[keyMetaId].conf.aof_rewrite(&io, kv, meta);
keyMetaClass[keyMetaId].conf.aof_rewrite(&io, NULL, meta);
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);

View file

@ -60,8 +60,8 @@ typedef int KeyMetaClassId; /* Index into redisServer.keyMetaClass[] */
/* RDB load callback: Return 1 to attach, 0 to skip, -1 on error */
typedef int (*KeyMetaLoadFunc)(RedisModuleIO *rdb, uint64_t *meta, int encver);
typedef void (*KeyMetaSaveFunc)(RedisModuleIO *rdb, void *value, uint64_t *meta);
typedef void (*KeyMetaAOFRewriteFunc)(RedisModuleIO *aof, void *value, uint64_t meta);
typedef void (*KeyMetaSaveFunc)(RedisModuleIO *rdb, void *reserved, uint64_t *meta);
typedef void (*KeyMetaAOFRewriteFunc)(RedisModuleIO *aof, void *reserved, uint64_t meta);
typedef void (*KeyMetaFreeFunc)(const char *keyname, uint64_t meta);
typedef int (*KeyMetaCopyFunc)(struct RedisModuleKeyOptCtx *ctx, uint64_t *meta);
typedef int (*KeyMetaRenameFunc)(struct RedisModuleKeyOptCtx *ctx, uint64_t *meta);
@ -90,8 +90,8 @@ typedef struct KeyMetaClassConf {
void (*unlink)(struct RedisModuleKeyOptCtx *ctx, uint64_t *meta);
void (*free)(const char *keyname, uint64_t meta);
int (*rdb_load)(struct RedisModuleIO *rdb, uint64_t *meta, int metaver);
void (*rdb_save)(struct RedisModuleIO *rdb, void *value, uint64_t *meta);
void (*aof_rewrite)(struct RedisModuleIO *aof, void *value, uint64_t meta);
void (*rdb_save)(struct RedisModuleIO *rdb, void *reserved, uint64_t *meta);
void (*aof_rewrite)(struct RedisModuleIO *aof, void *reserved, uint64_t meta);
/****************************** TBD: ******************************/
int (*defrag) (struct RedisModuleDefragCtx *ctx, struct redisObject *key, uint64_t meta);

View file

@ -1208,7 +1208,10 @@ unsigned char *lpBatchInsert(unsigned char *lp, unsigned char *p, int where,
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
uint64_t new_listpack_bytes = old_listpack_bytes + addedlen;
if (new_listpack_bytes > UINT32_MAX) return NULL;
if (new_listpack_bytes > UINT32_MAX) {
if (enc != tmp) lp_free(enc);
return NULL;
}
/* Store the offset of the element 'p', so that we can obtain its
* address again after a reallocation. */
@ -1218,7 +1221,10 @@ unsigned char *lpBatchInsert(unsigned char *lp, unsigned char *p, int where,
/* Realloc before: we need more room. */
if (new_listpack_bytes > old_listpack_bytes &&
new_listpack_bytes > lp_malloc_size(lp)) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) {
if (enc != tmp) lp_free(enc);
return NULL;
}
dst = lp + poff;
}

View file

@ -2528,7 +2528,7 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
* RM_SetModuleOptions().
*/
int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
kvobj *kv = lookupKeyReadWithFlags(ctx->client->db, keyname, LOOKUP_NOTOUCH);
kvobj *kv = lookupKeyReadWithFlags(ctx->client->db, keyname, LOOKUP_NOEFFECTS);
keyModified(ctx->client,ctx->client->db,keyname,kv,1);
return REDISMODULE_OK;
}

View file

@ -4037,14 +4037,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
}
/* Register streams with IDMP producers for cron-based expiration. */
if (kv->type == OBJ_STREAM) {
stream *s = kv->ptr;
if (s->idmp_producers != NULL) {
robj *kobj = createStringObject(key, sdslen(key));
if (dictAddRaw(db->stream_idmp_keys, kobj, NULL) == NULL)
decrRefCount(kobj);
}
}
if (kv->type == OBJ_STREAM)
streamKeyLoaded(db, &keyobj, kv);
/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);

View file

@ -1011,8 +1011,8 @@ typedef void (*RedisModuleOnUnblocked)(RedisModuleCtx *ctx, RedisModuleCallReply
typedef int (*RedisModuleAuthCallback)(RedisModuleCtx *ctx, RedisModuleString *username, RedisModuleString *password, RedisModuleString **err);
typedef int (*RedisModuleKeyMetaLoadFunc)(RedisModuleIO *rdb, uint64_t *meta, int encver);
typedef void (*RedisModuleKeyMetaSaveFunc)(RedisModuleIO *rdb, void *value, uint64_t *meta);
typedef void (*RedisModuleKeyMetaAOFRewriteFunc)(RedisModuleIO *aof, void *value, uint64_t meta);
typedef void (*RedisModuleKeyMetaSaveFunc)(RedisModuleIO *rdb, void *reserved, uint64_t *meta);
typedef void (*RedisModuleKeyMetaAOFRewriteFunc)(RedisModuleIO *aof, void *reserved, uint64_t meta);
typedef void (*RedisModuleKeyMetaFreeFunc)(const char *keyname, uint64_t meta);
typedef int (*RedisModuleKeyMetaCopyFunc)(RedisModuleKeyOptCtx *ctx, uint64_t *meta);
typedef int (*RedisModuleKeyMetaRenameFunc)(RedisModuleKeyOptCtx *ctx, uint64_t *meta);

View file

@ -1507,27 +1507,42 @@ int sdsTest(int argc, char **argv, int flags) {
x = sdsResize(x, 200, 1);
test_cond("sdsresize() expand len", sdslen(x) == 40);
test_cond("sdsresize() expand strlen", strlen(x) == 40);
test_cond("sdsresize() expand alloc", sdsalloc(x) == 200);
#if defined(USE_JEMALLOC)
/* 224 - hdrlen(3) - 1(\0) */
test_cond("sdsresize() expand alloc", sdsalloc(x) == 220);
#endif
/* Test sdsresize - trim free space */
x = sdsResize(x, 80, 1);
test_cond("sdsresize() shrink len", sdslen(x) == 40);
test_cond("sdsresize() shrink strlen", strlen(x) == 40);
test_cond("sdsresize() shrink alloc", sdsalloc(x) == 80);
#if defined(USE_JEMALLOC)
/* 96 - hdrlen(3) - 1(\0) */
test_cond("sdsresize() shrink alloc", sdsalloc(x) == 92);
#endif
/* Test sdsresize - crop used space */
x = sdsResize(x, 30, 1);
test_cond("sdsresize() crop len", sdslen(x) == 30);
test_cond("sdsresize() crop strlen", strlen(x) == 30);
test_cond("sdsresize() crop alloc", sdsalloc(x) == 30);
#if defined(USE_JEMALLOC)
/* 40 - hdrlen(3) - 1(\0) */
test_cond("sdsresize() crop alloc", sdsalloc(x) == 36);
#endif
/* Test sdsresize - extend to different class */
x = sdsResize(x, 400, 1);
test_cond("sdsresize() expand len", sdslen(x) == 30);
test_cond("sdsresize() expand strlen", strlen(x) == 30);
test_cond("sdsresize() expand alloc", sdsalloc(x) == 400);
#if defined(USE_JEMALLOC)
/* 448 - hdrlen(5) - 1(\0) */
test_cond("sdsresize() expand alloc", sdsalloc(x) == 442);
#endif
/* Test sdsresize - shrink to different class */
x = sdsResize(x, 4, 1);
test_cond("sdsresize() crop len", sdslen(x) == 4);
test_cond("sdsresize() crop strlen", strlen(x) == 4);
#if defined(USE_JEMALLOC)
/* 8 - hdrlen(3) - 1(\0) */
test_cond("sdsresize() crop alloc", sdsalloc(x) == 4);
#endif
sdsfree(x);
{ /* Test adjustTypeIfNeeded() */

View file

@ -7783,7 +7783,8 @@ int zsetTest(int argc, char **argv, int flags);
struct redisTest {
char *name;
redisTestProc *proc;
int failed;
int test_count;
int passed_count;
} redisTests[] = {
{"ziplist", ziplistTest},
{"quicklist", quicklistTest},
@ -7837,32 +7838,40 @@ int main(int argc, char **argv) {
if (!strcasecmp(argv[2], "all")) {
int numtests = sizeof(redisTests)/sizeof(struct redisTest);
for (j = 0; j < numtests; j++) {
redisTests[j].failed = (redisTests[j].proc(argc,argv,flags) != 0);
}
/* Report tests result */
int failed_num = 0;
for (j = 0; j < numtests; j++) {
if (redisTests[j].failed) {
int before_total = __test_num;
int before_failed = __failed_tests;
redisTests[j].proc(argc,argv,flags);
redisTests[j].test_count = __test_num - before_total;
redisTests[j].passed_count = redisTests[j].test_count - (__failed_tests - before_failed);
if (redisTests[j].passed_count < redisTests[j].test_count)
failed_num++;
printf("[failed] Test - %s\n", redisTests[j].name);
} else {
printf("[ok] Test - %s\n", redisTests[j].name);
}
}
printf("%d tests, %d passed, %d failed\n", numtests,
numtests-failed_num, failed_num);
printf("\n========== Test Suite Summary ==========\n\n");
for (j = 0; j < numtests; j++) {
int failed = redisTests[j].passed_count < redisTests[j].test_count;
printf(" %s %-15s (%d/%d passed)%s\n",
failed ? "\033[31m[failed]" : "\033[32m[ok] \033[0m",
redisTests[j].name,
redisTests[j].passed_count, redisTests[j].test_count,
failed ? "\033[0m" : "");
}
return failed_num == 0 ? 0 : 1;
printf("\n Test Groups: %s%d passed\033[0m, %s%d failed\033[0m, %d total\n",
failed_num ? "" : "\033[32m", numtests-failed_num,
failed_num ? "\033[31m" : "", failed_num, numtests);
test_report();
} else {
redisTestProc *proc = getTestProcByName(argv[2]);
if (!proc) return -1; /* test not found */
return proc(argc,argv,flags);
proc(argc,argv,flags);
test_report();
}
return 0;
return __failed_tests ? 1 : 0;
}
#endif

View file

@ -4026,6 +4026,7 @@ int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc,
int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int pfmergeGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);

View file

@ -193,6 +193,8 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
int streamEntryExists(stream *s, streamID *id);
void streamKeyLoaded(redisDb *db, robj *key, robj *val);
void streamKeyRemoved(redisDb *db, robj *key, robj *val);
listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key);

View file

@ -5704,6 +5704,27 @@ static void trackStreamIdmpEntries(client *c, robj *key) {
}
}
/* To be used when a stream key was loaded into ram, re-register it in stream_idmp_keys if needed */
void streamKeyLoaded(redisDb *db, robj *key, robj *val) {
stream *s = val->ptr;
if (s->idmp_producers != NULL) {
robj *tracked_key = key;
if (key->refcount == OBJ_STATIC_REFCOUNT)
tracked_key = createStringObject(key->ptr, sdslen(key->ptr));
if (dictAddRaw(db->stream_idmp_keys, tracked_key, NULL)) {
incrRefCount(tracked_key);
}
if (tracked_key != key)
decrRefCount(tracked_key);
}
}
/* To be used when a steam key was removed from ram, un-redigster from stream_idmp_keys if needed */
void streamKeyRemoved(redisDb *db, robj *key, robj *val) {
UNUSED(val);
dictDelete(db->stream_idmp_keys, key);
}
/* Clean up expired idempotency entries from tracked streams. This function
* is invoked regularly from serverCron() to remove expired entries
* from the idmp_dict of streams that have idempotency tracking enabled,
@ -5750,6 +5771,7 @@ void handleExpiredIdmpEntries(void) {
}
/* Iterate through all producers and remove expired entries */
int modified = 0;
raxIterator ri;
raxStart(&ri, s->idmp_producers);
raxSeek(&ri, "^", NULL, 0);
@ -5769,6 +5791,7 @@ void handleExpiredIdmpEntries(void) {
}
/* Free the entry */
idmpEntryFree(entry, &s->alloc_size);
modified = 1;
} else {
break;
}
@ -5779,10 +5802,14 @@ void handleExpiredIdmpEntries(void) {
raxRemove(s->idmp_producers, ri.key, ri.key_len, NULL);
idmpProducerFree(producer, &s->alloc_size);
raxSeek(&ri, ">=", ri.key, ri.key_len);
modified = 1;
}
}
raxStop(&ri);
if (modified)
keyModified(NULL, db, key, kv, 0);
/* If no producers remain, free the entire rax tree */
if (raxSize(s->idmp_producers) == 0) {
raxFree(s->idmp_producers);

View file

@ -2793,7 +2793,10 @@ static void zdiffAlgorithm2(zsetopsrc *src, long setnum, zset *dstzset, size_t *
/* Exit if result set is empty as any additional removal
* of elements will have no effect. */
if (cardinality == 0) break;
if (cardinality == 0) {
zuiDiscardDirtyValue(&zval);
break;
}
}
zuiClearIterator(&src[j]);

View file

@ -30,14 +30,17 @@ extern int __test_num;
#define test_cond(descr,_c) do { \
__test_num++; printf("%d - %s: ", __test_num, descr); \
if(_c) printf("PASSED\n"); else {printf("FAILED\n"); __failed_tests++;} \
if(_c) printf("\033[32mPASSED\033[0m\n"); else {printf("\033[31mFAILED\033[0m\n"); __failed_tests++;} \
} while(0)
#define test_report() do { \
printf("%d tests, %d passed, %d failed\n", __test_num, \
__test_num-__failed_tests, __failed_tests); \
if (__failed_tests) { \
printf("=== WARNING === We have failed tests here...\n"); \
printf(" Tests: %d passed, \033[31m%d failed\033[0m, %d total\n", \
__test_num-__failed_tests, __failed_tests, __test_num); \
printf("\033[31m=== WARNING === We have failed tests here...\033[0m\n"); \
exit(1); \
} else { \
printf(" Tests: \033[32m%d passed\033[0m, %d failed, %d total\n", \
__test_num-__failed_tests, __failed_tests, __test_num); \
} \
} while(0)

View file

@ -54,9 +54,12 @@ test "Main db not affected when fail to diskless load" {
set rd [redis_deferring_client redis $master_id]
for {set j 0} {$j < $num} {incr j} {
$rd set $j $value
}
for {set j 0} {$j < $num} {incr j} {
$rd read
if {($j + 1) % 500 == 0} {
for {set i 0} {$i < 500} {incr i} {
$rd read
}
}
}
# Start the replica again

View file

@ -123,6 +123,7 @@ test "PUBSUB channels/shardchannels" {
assert_equal {3} [llength [$publishclient pubsub shardchannels]]
sunsubscribe $subscribeclient
$subscribeclient read
set channel_list [$publishclient pubsub shardchannels]
assert_equal {2} [llength $channel_list]
assert {[lsearch -exact $channel_list "\{channel.0\}2"] >= 0}

View file

@ -22,13 +22,18 @@ proc gen_write_load {host port seconds tls {key ""} {size 0} {sleep 0}} {
set start_time [clock seconds]
set r [redis $host $port 1 $tls]
$r client setname LOAD_HANDLER
catch {$r select 9} ;# select 9 will fail in cluster mode
$r read
catch {
$r select 9
$r read
} ;# select 9 will fail in cluster mode
# fixed size value
if {$size != 0} {
set value [string repeat "x" $size]
}
set count 0
while 1 {
if {$size == 0} {
set value [expr rand()]
@ -39,13 +44,27 @@ proc gen_write_load {host port seconds tls {key ""} {size 0} {sleep 0}} {
} else {
$r set $key $value
}
incr count
if {$count % 500 == 0} {
for {set i 0} {$i < 500} {incr i} {
$r read
}
}
if {[clock seconds]-$start_time > $seconds} {
exit 0
break
}
if {$sleep ne 0} {
after $sleep
}
}
# Read remaining replies
for {set i 0} {$i < $count} {incr i} {
$r read
}
exit 0
}
gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3] [lindex $argv 4] [lindex $argv 5] [lindex $argv 6]

View file

@ -173,11 +173,11 @@ static int KeyMetaMoveDiscardCallback(RedisModuleKeyOptCtx *ctx, uint64_t *meta)
*
* Parameters:
* - rdb: RedisModuleIO context for writing to RDB
* - value: The kvobj (key-value object) - not used in this implementation
* - reserved: Reserved for future use
* - meta: Pointer to the 8-byte metadata value (pointer to our string)
*/
static void KeyMetaRDBSaveCallback(RedisModuleIO *rdb, void *value, uint64_t *meta) {
REDISMODULE_NOT_USED(value);
static void KeyMetaRDBSaveCallback(RedisModuleIO *rdb, void *reserved, uint64_t *meta) {
REDISMODULE_NOT_USED(reserved);
/* If metadata is NULL (reset_value), don't save anything */
if (*meta == 0) return;
@ -252,12 +252,12 @@ static int KeyMetaRDBLoadCallback(RedisModuleIO *rdb, uint64_t *meta, int encver
*
* Parameters:
* - aof: RedisModuleIO context for writing to AOF
* - value: The kvobj (key-value object) - not used in this implementation
* - reserved: Reserved for future use
* - meta: The 8-byte metadata value (pointer to our string)
* - class_id: The class ID for this metadata
*/
static void KeyMetaAOFRewriteCallback_Class(RedisModuleIO *aof, void *value, uint64_t meta, RedisModuleKeyMetaClassId class_id) {
REDISMODULE_NOT_USED(value);
static void KeyMetaAOFRewriteCallback_Class(RedisModuleIO *aof, void *reserved, uint64_t meta, RedisModuleKeyMetaClassId class_id) {
REDISMODULE_NOT_USED(reserved);
/* If metadata is NULL (reset_value), don't emit anything */
if (meta == 0) return;
@ -289,32 +289,32 @@ static void KeyMetaAOFRewriteCallback_Class(RedisModuleIO *aof, void *value, uin
/* Individual AOF rewrite callbacks for each class (1-7)
* Each callback wraps the common implementation with its specific class ID */
static void KeyMetaAOFRewriteCb1(RedisModuleIO *aof, void *value, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, value, meta, 1);
static void KeyMetaAOFRewriteCb1(RedisModuleIO *aof, void *reserved, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 1);
}
static void KeyMetaAOFRewriteCb2(RedisModuleIO *aof, void *value, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, value, meta, 2);
static void KeyMetaAOFRewriteCb2(RedisModuleIO *aof, void *reserved, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 2);
}
static void KeyMetaAOFRewriteCb3(RedisModuleIO *aof, void *value, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, value, meta, 3);
static void KeyMetaAOFRewriteCb3(RedisModuleIO *aof, void *reserved, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 3);
}
static void KeyMetaAOFRewriteCb4(RedisModuleIO *aof, void *value, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, value, meta, 4);
static void KeyMetaAOFRewriteCb4(RedisModuleIO *aof, void *reserved, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 4);
}
static void KeyMetaAOFRewriteCb5(RedisModuleIO *aof, void *value, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, value, meta, 5);
static void KeyMetaAOFRewriteCb5(RedisModuleIO *aof, void *reserved, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 5);
}
static void KeyMetaAOFRewriteCb6(RedisModuleIO *aof, void *value, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, value, meta, 6);
static void KeyMetaAOFRewriteCb6(RedisModuleIO *aof, void *reserved, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 6);
}
static void KeyMetaAOFRewriteCb7(RedisModuleIO *aof, void *value, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, value, meta, 7);
static void KeyMetaAOFRewriteCb7(RedisModuleIO *aof, void *reserved, uint64_t meta) {
KeyMetaAOFRewriteCallback_Class(aof, reserved, meta, 7);
}
/* KEYMETA.REGISTER <4-char-id> <version> [KEEPONCOPY:KEEPONRENAME:UNLINKFREE:ALLOWIGNORE:NORDBLOAD:NORDBSAVE] */

View file

@ -33,13 +33,13 @@ start_server {tags {"gcra" "external:skip"}} {
assert_match "*not a valid float*" $err
# tokens (optional) must be >= 1
catch {r gcra mykey 10 5 10 NUM_REQUESTS} err
assert_match "*Missing NUM_REQUESTS value*" $err
catch {r gcra mykey 10 5 10 NUM_REQUESTS 0} err
catch {r gcra mykey 10 5 10 TOKENS} err
assert_match "*Missing TOKENS value*" $err
catch {r gcra mykey 10 5 10 TOKENS 0} err
assert_match "*out of range*" $err
catch {r gcra mykey 10 5 10 NUM_REQUESTS -1} err
catch {r gcra mykey 10 5 10 TOKENS -1} err
assert_match "*out of range*" $err
catch {r gcra mykey 10 5 10 NUM_REQUESTS notanumber} err
catch {r gcra mykey 10 5 10 TOKENS notanumber} err
assert_match "*not an integer*" $err
# Valid arguments with default tokens
@ -53,7 +53,7 @@ start_server {tags {"gcra" "external:skip"}} {
# Valid arguments with explicit tokens
r del mykey
set result [r gcra mykey 10 5 60 NUM_REQUESTS 2]
set result [r gcra mykey 10 5 60 TOKENS 2]
assert_equal 5 [llength $result]
assert_equal 11 [lindex $result 1]
@ -130,7 +130,7 @@ start_server {tags {"gcra" "external:skip"}} {
r del mykey
# Consume 3 tokens from fresh state
set result2 [r gcra mykey 5 1 60 NUM_REQUESTS 3]
set result2 [r gcra mykey 5 1 60 TOKENS 3]
set avail2 [lindex $result2 2]
assert_equal 3 $avail2
}
@ -168,7 +168,7 @@ start_server {tags {"gcra" "external:skip"}} {
r del mykey
r gcra mykey 5 1 60
set result [r gcra mykey 5 1 60 NUM_REQUESTS 4]
set result [r gcra mykey 5 1 60 TOKENS 4]
set full_burst_after [lindex $result 4]
assert {$full_burst_after >= 299}
}
@ -216,7 +216,7 @@ start_server {tags {"gcra" "external:skip"}} {
test {GCRA - overflow} {
r del mykey
catch {r gcra mykey 1 1 86400 NUM_REQUESTS 200000000} err
catch {r gcra mykey 1 1 86400 TOKENS 200000000} err
assert_match "*would cause an overflow*" $err
r del mykey
@ -224,7 +224,7 @@ start_server {tags {"gcra" "external:skip"}} {
assert_match "*would cause an overflow*" $err
r del mykey
catch {r gcra mykey 1 1 2147483647 NUM_REQUESTS 2147483647} err
catch {r gcra mykey 1 1 2147483647 TOKENS 2147483647} err
assert_match "*would cause an overflow*" $err
}
}
@ -255,7 +255,7 @@ start_server {tags {"gcra repl" "external:skip"}} {
assert_equal [lsearch -glob $cmdinfo "cmdstat_set:*"] -1
$master del mykey
$master gcra mykey 2 1 1000 NUM_REQUESTS 2
$master gcra mykey 2 1 1000 TOKENS 2
wait_for_ofs_sync $master $replica

View file

@ -189,6 +189,17 @@ start_server {tags {"introspection"}} {
assert_equal {key1 key2} [r command getkeys lcs key1 key2]
}
test {COMMAND GETKEYS PFMERGE with and without source keys} {
# dest + sources: both key specs yield keys
assert_equal {dest src1 src2} [r command getkeys PFMERGE dest src1 src2]
# dest only, no source keys: spec[1] yields empty range (last < first).
# Without pfmergeGetKeys this returned "Invalid arguments" because
# getKeysUsingKeySpecs treated the empty range as invalid_spec,
# discarding the dest key found by spec[0].
assert_equal {dest} [r command getkeys PFMERGE dest]
}
test {COMMAND GETKEYS MORE THAN 256 KEYS} {
set all_keys [list]
set numkeys 260

View file

@ -24,9 +24,12 @@ proc test_memory_efficiency {range} {
incr written [string length $key]
incr written [string length $val]
incr written 2 ;# A separator is the minimum to store key-value data.
}
for {set j 0} {$j < 10000} {incr j} {
$rd read ; # Discard replies
if {($j + 1) % 500 == 0} {
for {set i 0} {$i < 500} {incr i} {
$rd read ; # Discard replies
}
}
}
set current_mem [s used_memory]
@ -352,7 +355,7 @@ run_solo {defrag} {
# create big keys with 10k items
# Use batching to avoid TCP deadlock
set rd [redis_deferring_client]
set batch_size 1000
set batch_size 100
for {set j 0} {$j < 10000} {incr j} {
$rd hset bighash $j [concat "asdfasdfasdf" $j]
$rd lpush biglist [concat "asdfasdfasdf" $j]
@ -937,7 +940,7 @@ run_solo {defrag} {
$rd lpush biglist2 $val
incr count
discard_replies_every $rd $count 10000 20000
discard_replies_every $rd $count 1000 2000
}
# create some fragmentation

View file

@ -1027,10 +1027,18 @@ foreach type {single multiple single_multiple} {
}
}
r deferred 1
set count 0
foreach m $members {
r srem $myset $m
incr count
if {$count == 500} {
for {set i 0} {$i < 500} {incr i} {
r read
}
set count 0
}
}
foreach m $members {
for {set i 0} {$i < $count} {incr i} {
r read
}
r deferred 0

View file

@ -1114,6 +1114,18 @@ start_server {tags {"zset"}} {
assert_equal {a 1 e 5} [r zrange zsete{t} 0 -1 withscores]
}
test "ZDIFF algorithm 2 empty result early exit - $encoding" {
# Force algorithm 2 by inflating setnum with non-existing keys.
# algo_one_work = len(src[0]) * setnum / 2 = 2 * 10 / 2 = 10
# algo_two_work = 2 + 2 + 0*8 = 4
# algo_one (10) > algo_two (4) -> algorithm 2 is selected
r del zseta{t} zsetb{t} zsetc{t}
r zadd zseta{t} 1 a 2 b
r zadd zsetb{t} 1 a 2 b
assert_equal 0 [r zdiffstore zsetc{t} 10 zseta{t} zsetb{t} nx1{t} nx2{t} nx3{t} nx4{t} nx5{t} nx6{t} nx7{t} nx8{t}]
assert_equal {} [r zrange zsetc{t} 0 -1 withscores]
}
test "ZDIFF fuzzing - $encoding" {
for {set j 0} {$j < 100} {incr j} {
unset -nocomplain s