diff --git a/src/t_hash.c b/src/t_hash.c index 208e6775b..b42e3c259 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -540,8 +540,10 @@ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, ex->minExpireFields = prevExpire; } - /* if expiration time is in the past */ + /* If expired, then delete the field and propagate the deletion. + * If replica, continue like the field is valid */ if (unlikely(checkAlreadyExpired(expireAt))) { + propagateHashFieldDeletion(ex->db, ex->key->ptr, field, sdslen(field)); hashTypeDelete(ex->hashObj, field, 1); ex->fieldDeleted++; return HSETEX_DELETED; @@ -1034,8 +1036,11 @@ SetExRes hashTypeSetExpiryHT(HashTypeSetEx *exInfo, sds field, uint64_t expireAt dictSetKey(ht, existingEntry, hfNew); - /* if expiration time is in the past */ + /* If expired, then delete the field and propagate the deletion. + * If replica, continue like the field is valid */ if (unlikely(checkAlreadyExpired(expireAt))) { + /* replicas should not initiate deletion of fields */ + propagateHashFieldDeletion(exInfo->db, exInfo->key->ptr, field, sdslen(field)); hashTypeDelete(exInfo->hashObj, field, 1); exInfo->fieldDeleted++; return HSETEX_DELETED; @@ -1101,12 +1106,7 @@ void initDictExpireMetadata(sds key, robj *o) { m->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */ } -/* - * Init HashTypeSetEx struct before calling hashTypeSetEx() - * - * Don't have to provide client and "cmd". If provided, then notification once - * done by function hashTypeSetExDone(). - */ +/* Init HashTypeSetEx struct before calling hashTypeSetEx() */ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cmd, ExpireSetCond expireSetCond, HashTypeSetEx *ex) { @@ -1123,20 +1123,20 @@ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cm ex->minExpireFields = EB_EXPIRE_TIME_INVALID; /* Take care that HASH support expiration */ - if (ex->hashObj->encoding == OBJ_ENCODING_LISTPACK) { - hashTypeConvert(ex->hashObj, OBJ_ENCODING_LISTPACK_EX, &c->db->hexpires); + if (o->encoding == OBJ_ENCODING_LISTPACK) { + hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, &c->db->hexpires); - listpackEx *lpt = ex->hashObj->ptr; + listpackEx *lpt = o->ptr; dictEntry *de = dbFind(c->db, key->ptr); serverAssert(de != NULL); lpt->key = dictGetKey(de); - } else if (ex->hashObj->encoding == OBJ_ENCODING_HT) { + } else if (o->encoding == OBJ_ENCODING_HT) { /* Take care dict has HFE metadata */ if (!isDictWithMetaHFE(ht)) { /* Realloc (only header of dict) with metadata for hash-field expiration */ dictTypeAddMeta(&ht, &mstrHashDictTypeWithHFE); dictExpireMetadata *m = (dictExpireMetadata *) dictMetadata(ht); - ex->hashObj->ptr = ht; + o->ptr = ht; /* Find the key in the keyspace. Need to keep reference to the key for * notifications or even removal of the hash */ @@ -1151,7 +1151,7 @@ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cm } /* Read minExpire from attached ExpireMeta to the hash */ - ex->minExpire = hashTypeGetMinExpire(ex->hashObj, 0); + ex->minExpire = hashTypeGetMinExpire(o, 0); return C_OK; } @@ -3040,11 +3040,34 @@ static void httlGenericCommand(client *c, const char *cmd, long long basetime, i * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for * the argv[2] parameter. The basetime is always specified in milliseconds. * - * Additional flags are supported and parsed via parseExtendedExpireArguments */ + * PROPAGATE TO REPLICA: + * The command will be translated into HPEXPIREAT and the expiration time will be + * converted to absolute time in milliseconds. + * + * As we need to propagate H(P)EXPIRE(AT) command to the replica, each field that + * is mentioned in the command should be categorized into one of the four options: + * 1. Field’s expiration time updated successfully - Propagate it to replica as + * part of the HPEXPIREAT command. + * 2. The field got deleted since the time is in the past - propagate also HDEL + * command to delete the field. Also remove the field from the propagated + * HPEXPIREAT command. + * 3. Condition not met for the field - Remove the field from the propagated + * HPEXPIREAT command. + * 4. Field doesn't exists - Remove the field from propagated HPEXPIREAT command. + * + * If none of the provided fields match option #1, that is provided time of the + * command is in the past, then avoid propagating the HPEXPIREAT command to the + * replica. + * + * This approach is aligned with existing EXPIRE command. If a given key has already + * expired, then DEL will be propagated instead of EXPIRE command. If condition + * not met, then command will be rejected. Otherwise, EXPIRE command will be + * propagated for given key. + */ static void hexpireGenericCommand(client *c, const char *cmd, long long basetime, int unit) { long numFields = 0, numFieldsAt = 4; long long expire; /* unix time in msec */ - int expireSetCond = 0; + int fieldAt, fieldsNotSet = 0, expireSetCond = 0; robj *hashObj, *keyArg = c->argv[1], *expireArg = c->argv[2]; /* Read the hash object */ @@ -3117,14 +3140,38 @@ static void hexpireGenericCommand(client *c, const char *cmd, long long basetime hashTypeSetExInit(keyArg, hashObj, c, c->db, cmd, expireSetCond, &exCtx); addReplyArrayLen(c, numFields); - for (int i = 0 ; i < numFields ; i++) { - sds field = c->argv[numFieldsAt+i+1]->ptr; + fieldAt = numFieldsAt + 1; + while (fieldAt < c->argc) { + sds field = c->argv[fieldAt]->ptr; SetExRes res = hashTypeSetEx(hashObj, field, expire, &exCtx); + + if (unlikely(res != HSETEX_OK)) { + /* If the field was not set, prevent field propagation */ + rewriteClientCommandArgument(c, fieldAt, NULL); + fieldsNotSet = 1; + } else { + ++fieldAt; + } + addReplyLongLong(c,res); } + hashTypeSetExDone(&exCtx); - /* rewrite command for the replica sake */ + /* Avoid propagating command if not even one field was updated (Either because + * the time is in the past, and corresponding HDELs were sent, or conditions + * not met) then it is useless and invalid to propagate command with no fields */ + if (exCtx.fieldUpdated == 0) { + preventCommandPropagation(c); + return; + } + + /* If some fields were dropped, rewrite the number of fields */ + if (fieldsNotSet) { + robj *numFieldsObj = createStringObjectFromLongLong(exCtx.fieldUpdated); + rewriteClientCommandArgument(c, numFieldsAt, numFieldsObj); + decrRefCount(numFieldsObj); + } /* Propagate as HPEXPIREAT millisecond-timestamp. Rewrite only if not already */ if (c->cmd->proc != hpexpireatCommand) { diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index cca616b61..8c71ebed2 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -914,14 +914,29 @@ start_server {tags {"external:skip needs:debug"}} { r config set hash-max-listpack-entries 512 } - test "Command rewrite and expired hash fields are propagated to replica ($type)" { + test "Test Command propagated to replica as expected ($type)" { start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { set aof [get_last_incr_aof_path r] + + # Time is in the past so it should propagate HDELs to replica + # and delete the fields + r hset h0 x1 y1 x2 y2 + r hexpireat h0 1 fields 3 x1 x2 non_exists_field + r hset h1 f1 v1 f2 v2 + # Next command won't be propagated to replica + # because XX condition not met or field not exists + r hexpire h1 10 XX FIELDS 1 f1 f2 non_exists_field + r hpexpire h1 20 FIELDS 1 f1 - r hpexpire h1 30 FIELDS 1 f2 + + # Next command will be propagate with only field 'f2' + # because NX condition not met for field 'f1' + r hpexpire h1 30 NX FIELDS 1 f1 f2 + + # Non exists field should be ignored r hpexpire h1 30 FIELDS 1 non_exists_field r hset h2 f1 v1 f2 v2 f3 v3 f4 v4 r hpexpire h2 40 FIELDS 2 f1 non_exists_field @@ -938,11 +953,16 @@ start_server {tags {"external:skip needs:debug"}} { # Assert that each TTL-related command are persisted with absolute timestamps in AOF assert_aof_content $aof { {select *} + {hset h0 x1 y1 x2 y2} + {multi} + {hdel h0 x1} + {hdel h0 x2} + {exec} {hset h1 f1 v1 f2 v2} {hpexpireat h1 * FIELDS 1 f1} {hpexpireat h1 * FIELDS 1 f2} {hset h2 f1 v1 f2 v2 f3 v3 f4 v4} - {hpexpireat h2 * FIELDS 2 f1 non_exists_field} + {hpexpireat h2 * FIELDS 1 f1} {hpexpireat h2 * FIELDS 1 f2} {hpexpireat h2 * FIELDS 1 f3} {hpexpireat h2 * FIELDS 1 f4} @@ -1072,7 +1092,7 @@ start_server {tags {"external:skip needs:debug"}} { {hset h2 f2 v2} {hpexpireat h2 * NX FIELDS 1 f2} {hset h3 f3 v3 f4 v4 f5 v5} - {hpexpireat h3 * FIELDS 3 f3 f4 non_exists_field} + {hpexpireat h3 * FIELDS 2 f3 f4} {hpersist h3 FIELDS 1 f3} } close_replication_stream $repl