diff --git a/src/replication.c b/src/replication.c index a18e74f113..ed43767488 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2440,7 +2440,12 @@ void readSyncBulkPayload(connection *conn) { replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; + server.repl_up_since = server.unixtime; + if (server.repl_disconnect_start_time != 0) { + server.repl_total_disconnect_time += server.unixtime - server.repl_disconnect_start_time; + server.repl_disconnect_start_time = 0; + } /* Fire the master link modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, REDISMODULE_SUBEVENT_MASTER_LINK_UP, @@ -3161,6 +3166,8 @@ write_error: /* Handle sendCommand() errors. */ } int connectWithMaster(void) { + server.repl_current_sync_attempts++; + server.repl_total_sync_attempts++; server.repl_transfer_s = connCreate(server.el, connTypeOfReplication()); if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport, server.bind_source_addr, syncWithMaster) == C_ERR) { @@ -3193,6 +3200,8 @@ void undoConnectWithMaster(void) { void replicationAbortSyncTransfer(void) { serverAssert(server.repl_state == REPL_STATE_TRANSFER); undoConnectWithMaster(); + if (server.repl_disconnect_start_time == 0) + server.repl_disconnect_start_time = server.unixtime; if (server.repl_transfer_fd!=-1) { close(server.repl_transfer_fd); bg_unlink(server.repl_transfer_tmpfile); @@ -3284,6 +3293,8 @@ void replicationSetMaster(char *ip, int port) { NULL); server.repl_state = REPL_STATE_CONNECT; + server.repl_current_sync_attempts = 0; + server.repl_total_sync_attempts = 0; serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); connectWithMaster(); @@ -3317,7 +3328,9 @@ void replicationUnsetMaster(void) { * a very fast reconnection. */ disconnectSlaves(); server.repl_state = REPL_STATE_NONE; - + /* Reset the attempts number. */ + server.repl_current_sync_attempts = 0; + server.repl_total_sync_attempts = 0; /* We need to make sure the new master will start the replication stream * with a SELECT statement. This is forced after a full resync, but * with PSYNC version 2, there is no need for full resync after a @@ -3333,9 +3346,9 @@ void replicationUnsetMaster(void) { * failover if slaves do not connect immediately. */ server.repl_no_slaves_since = server.unixtime; - /* Reset down time so it'll be ready for when we turn into replica again. */ + /* Reset up and down time so it'll be ready for when we turn into replica again. */ server.repl_down_since = 0; - + server.repl_up_since = 0; /* Fire the role change modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER, @@ -3359,8 +3372,11 @@ void replicationHandleMasterDisconnection(void) { NULL); server.master = NULL; + if (server.repl_state == REPL_STATE_CONNECTED) + server.repl_current_sync_attempts = 0; server.repl_state = REPL_STATE_CONNECT; server.repl_down_since = server.unixtime; + server.repl_up_since = 0; server.repl_num_master_disconnection++; /* If we are in the loop of streaming accumulated buffers, discard the @@ -3370,6 +3386,8 @@ void replicationHandleMasterDisconnection(void) { if (server.repl_main_ch_state & REPL_MAIN_CH_STREAMING_BUF) rdbChannelCleanup(); + if (server.repl_disconnect_start_time == 0) + server.repl_disconnect_start_time = server.unixtime; /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -4253,7 +4271,11 @@ void replicationResurrectCachedMaster(connection *conn) { server.master->lastinteraction = server.unixtime; server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; - + server.repl_up_since = server.unixtime; + if (server.repl_disconnect_start_time != 0) { + server.repl_total_disconnect_time += server.unixtime - server.repl_disconnect_start_time; + server.repl_disconnect_start_time = 0; + } /* Fire the master link modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, REDISMODULE_SUBEVENT_MASTER_LINK_UP, diff --git a/src/server.c b/src/server.c index dadb6c838a..b7d883ec1e 100644 --- a/src/server.c +++ b/src/server.c @@ -2289,9 +2289,11 @@ void initServerConfig(void) { server.repl_transfer_s = NULL; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ + server.repl_up_since = 0; server.master_repl_offset = 0; server.fsynced_reploff_pending = 0; server.repl_stream_lastio = server.unixtime; + server.repl_total_sync_attempts = 0; /* Replication partial resync backlog */ server.repl_backlog = NULL; @@ -2916,6 +2918,7 @@ void initServer(void) { server.cron_malloc_stats.allocator_allocated = 0; server.cron_malloc_stats.allocator_active = 0; server.cron_malloc_stats.allocator_resident = 0; + server.repl_current_sync_attempts = 0; server.lastbgsave_status = C_OK; server.aof_last_write_status = C_OK; server.aof_last_write_errno = 0; @@ -6253,6 +6256,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { if (server.masterhost) { long long slave_repl_offset = 1; long long slave_read_repl_offset = 1; + time_t current_disconnect_time = server.repl_down_since ? + server.unixtime - server.repl_down_since : 0 ; if (server.master) { slave_repl_offset = server.master->reploff; @@ -6271,8 +6276,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "slave_read_repl_offset:%lld\r\n", slave_read_repl_offset, "slave_repl_offset:%lld\r\n", slave_repl_offset, "replica_full_sync_buffer_size:%zu\r\n", server.repl_full_sync_buffer.size, - "replica_full_sync_buffer_peak:%zu\r\n", server.repl_full_sync_buffer.peak)); - + "replica_full_sync_buffer_peak:%zu\r\n", server.repl_full_sync_buffer.peak, + "master_current_sync_attempts:%lld\r\n", server.repl_current_sync_attempts, + "master_total_sync_attempts:%lld\r\n", server.repl_total_sync_attempts)); if (server.repl_state == REPL_STATE_TRANSFER) { double perc = 0; if (server.repl_transfer_size) { @@ -6291,7 +6297,14 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "master_link_down_since_seconds:%jd\r\n", server.repl_down_since ? (intmax_t)(server.unixtime-server.repl_down_since) : -1); + } else { + info = sdscatprintf(info, + "master_link_up_since_seconds:%jd\r\n", + server.repl_up_since ? /* defensive code, should never be 0 when connected */ + (intmax_t)(server.unixtime-server.repl_up_since) : -1); } + info = sdscatprintf(info, "total_disconnect_time_sec:%jd\r\n", (intmax_t)server.repl_total_disconnect_time+(current_disconnect_time)); + info = sdscatprintf(info, FMTARGS( "slave_priority:%d\r\n", server.slave_priority, "slave_read_only:%d\r\n", server.repl_slave_ro, diff --git a/src/server.h b/src/server.h index 8942bdc7c0..9173c5abfb 100644 --- a/src/server.h +++ b/src/server.h @@ -2132,6 +2132,7 @@ struct redisServer { int repl_slave_ro; /* Slave is read only? */ int repl_slave_ignore_maxmemory; /* If true slaves do not evict. */ time_t repl_down_since; /* Unix time at which link with master went down */ + time_t repl_up_since; /* Unix time that master link is fully up and healthy */ int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ int slave_priority; /* Reported in INFO and used by Sentinel. */ int replica_announced; /* If true, replica is announced by Sentinel */ @@ -2150,6 +2151,10 @@ struct redisServer { /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ + long long repl_current_sync_attempts; /* Number of times in current configuration, the replica attempted to sync since the last success. */ + long long repl_total_sync_attempts; /* Number of times in current configuration, the replica attempted to sync to a master */ + time_t repl_disconnect_start_time; /* Unix time that master disconnection start */ + time_t repl_total_disconnect_time; /* The total cumulative time we've been disconnected as a replica, visible when the link is up too. */ /* Limits */ unsigned int maxclients; /* Max number of simultaneous clients */ unsigned long long maxmemory; /* Max number of memory bytes to use */ diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index aa88f84fb7..50c178a600 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -1659,3 +1659,171 @@ start_server {tags {"repl external:skip"}} { } } } + +start_server {tags {"repl external:skip"}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + start_server {} { + set slave [srv 0 client] + $slave slaveof $master_host $master_port + + test "Accumulate repl_total_disconnect_time with delayed reconnection" { + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Initial replica setup failed" + } + + # Simulate disconnect by pointing to invalid master + $slave slaveof $master_host 0 + after 1000 + + $slave slaveof $master_host $master_port + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Initial replica setup failed" + } + assert {[status $slave total_disconnect_time_sec] >= 1} + } + + test "Test the total_disconnect_time_sec incr after slaveof no one" { + $slave slaveof no one + after 1000 + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Can't turn the instance into a replica" + } + assert {[status $slave total_disconnect_time_sec] >= 2} + } + + test "Test correct replication disconnection time counters behavior" { + # Simulate disconnection + $slave slaveof $master_host 0 + + after 1000 + + set total_disconnect_time [status $slave total_disconnect_time_sec] + set link_down_since [status $slave master_link_down_since_seconds] + + # Restore real master + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Replication did not reconnect" + } + # total_disconnect_time and link_down_since incer + assert {$total_disconnect_time >= 3} + assert {$link_down_since > 0} + assert {$total_disconnect_time > $link_down_since} + + # total_disconnect_time did not change after reconnect to real master + set total_disconnect_time_reconnect [status $slave total_disconnect_time_sec] + assert {$total_disconnect_time == $total_disconnect_time_reconnect} + + } + } +} + +start_server {tags {"repl external:skip"}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + start_server {} { + set slave [srv 0 client] + $slave slaveof $master_host $master_port + + # Test: Normal establishment of the master link + test "Test normal establishment process of the master link" { + wait_for_condition 50 100 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + assert_equal 1 [status $slave master_current_sync_attempts] + assert_equal 1 [status $slave master_total_sync_attempts] + } + + # Test: Sync attempts reset after 'slaveof no one' + test "Test sync attempts reset after slaveof no one" { + $slave slaveof no one + $slave slaveof $master_host $master_port + + wait_for_condition 50 100 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + assert_equal 1 [status $slave master_current_sync_attempts] + assert_equal 1 [status $slave master_total_sync_attempts] + } + + # Test: Sync attempts reset on master reconnect + test "Test sync attempts reset on master reconnect" { + $slave client kill type master + + wait_for_condition 50 100 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + assert_equal 1 [status $slave master_current_sync_attempts] + assert_equal 2 [status $slave master_total_sync_attempts] + } + + # Test: Sync attempts reset on master switch + test "Test sync attempts reset on master switch" { + start_server {} { + set new_master_host [srv 0 host] + set new_master_port [srv 0 port] + $slave slaveof $new_master_host $new_master_port + + wait_for_condition 50 100 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + assert_equal 1 [status $slave master_current_sync_attempts] + assert_equal 1 [status $slave master_total_sync_attempts] + } + } + + # Test: Replication current attempts counter behavior + test "Replication current attempts counter behavior" { + $slave slaveof $master_host $master_port + + # Wait until replica state becomes "connected" + wait_for_condition 1000 50 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "slave did not connect to master." + } + + assert_equal 1 [status $slave master_current_sync_attempts] + + # Connect to an invalid master + $slave slaveof $master_host 0 + after 1000 + + # Expect current sync attempts to increase + assert {[status $slave master_current_sync_attempts] >= 2} + } + } +}