Skip to content

Commit

Permalink
Adding primary timestamp and replication lag
Browse files Browse the repository at this point in the history
Adding a metric to show the lag between a primary and the replica.
Also add the undelying mechanism for this metric
This includes:
*  master_timestamp
*  replication_lag
*  primary_timestamp_update_failure_count
*  lag_behind_primary_timestamp_ms
  • Loading branch information
AshMosh committed Mar 5, 2024
1 parent 33ea432 commit 680e234
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3111,6 +3111,7 @@ standardConfig static_configs[] = {
createBoolConfig("latency-tracking", NULL, MODIFIABLE_CONFIG, server.latency_tracking_enabled, 1, NULL, NULL),
createBoolConfig("aof-disable-auto-gc", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.aof_disable_auto_gc, 0, NULL, updateAofAutoGCEnabled),
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("repl-timestamp-enabled", NULL, MODIFIABLE_CONFIG, server.repl_time.timestamp_enabled, 1, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
114 changes: 114 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ void replicationSendAck(void);
int replicaPutOnline(client *slave);
void replicaStartCommandStream(client *slave);
int cancelReplicationHandshake(int reconnect);
int replicationTimestampHandleMsg(robj *timestamp_obj);
/*
* Update the latest_repl_master_timestamp and send it to replicas via REPLCONF TIMESTAMP messages.
*/
void replicationTimestampCron();

/* We take a global flag to remember if this instance generated an RDB
* because of replication, so that we can remove the RDB file in case
Expand Down Expand Up @@ -1275,6 +1280,17 @@ void replconfCommand(client *c) {
}
}
sdsfreesplitres(filters, filter_count);
} else if (!strcasecmp(c->argv[j]->ptr, "timestamp")) {
if (!(c->flags & CLIENT_MASTER)) {
addReplyError(c, "REPLCONF TIMESTAMP was called from a non master client. Ignoring it.");
return;
}
if (replicationTimestampHandleMsg(c->argv[j + 1]) != C_OK) {
addReplyErrorFormat(c,"Unrecognized REPLCONF TIMESTAMP option: %s",
(char*)c->argv[j+1]->ptr);
return;
}
return;
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
Expand Down Expand Up @@ -3793,6 +3809,8 @@ void replicationCron(void) {
}
}

replicationTimestampCron();

/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
*
Expand Down Expand Up @@ -4245,3 +4263,99 @@ void updateFailoverStatus(void) {
server.target_replica_port);
}
}

// ------------ Replication lag info ------------
#define MAX_REPLICATION_LAG 60 * 60 * 24 * 30

static ReplTimeContext repl_time_context = {0};

// ------------ Private utility methods ---------------

static void updateLatestReplmasterTimestampOnmaster() {
serverAssert(server.masterhost == NULL);

mstime_t curtime = mstime();
if (curtime <= server.repl_time.latest_repl_master_timestamp) {
repl_time_context.latest_repl_master_timestamp_update_failure_count++;
repl_time_context.lag_behind_latest_repl_master_timestamp_ms = server.repl_time.latest_repl_master_timestamp - curtime;
} else {
/* We never update the latest_repl_master_timestamp to an earlier time,
* which ensures we will not record a smaller than current master timestamps to the replication stream */
server.repl_time.latest_repl_master_timestamp = curtime;
repl_time_context.lag_behind_latest_repl_master_timestamp_ms = 0;
}
}

/*
* Sends REPLCONF TIMESTAMP msg carrying the latest_repl_master_timestamp to replicas.
*/
static void replicationTimstampSendMsg() {
serverAssert(server.masterhost == NULL);

robj *argv[3];

const char *args[] = {"REPLCONF", "TIMESTAMP"};

argv[0] = createStringObject(args[0], strlen(args[0]));
argv[1] = createStringObject(args[1], strlen(args[1]));
argv[2] = createStringObjectFromLongLong(server.repl_time.latest_repl_master_timestamp);
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);

decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);
}

// ---------- Public API methods -----------

void replicationTimestampCron() {
if (!server.repl_time.timestamp_enabled || !iAmMaster()) return;
updateLatestReplmasterTimestampOnmaster();
// Don't send timestamps when there is no connected replica or when a failover is in progress
if (!isManualFailoverOrPauseInProgress() && listLength(server.slaves)) {
replicationTimstampSendMsg();
}
}

int isManualFailoverOrPauseInProgress(void){
return ((server.cluster_enabled &&
clusterManualFailoverTimeLimit()) ||
server.failover_end_time) &&
isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA);
}

int replicationTimestampHandleMsg(robj *timestamp_obj) {
serverAssert(server.masterhost != NULL);

mstime_t timestamp;
if (getLongLongFromObject(timestamp_obj, &timestamp) != C_OK) {
serverLog(LL_WARNING, "Unable to parse master replication timestamp");
return C_ERR;
}
if (timestamp <= server.repl_time.latest_repl_master_timestamp)
server.repl_time.latest_repl_master_timestamp = timestamp;

mstime_t time_diff = mstime() - timestamp;
if (time_diff < 0 || time_diff > (long long) MAX_REPLICATION_LAG) {
serverLog(LL_WARNING, "Unreasonable replication timestamp received from master: %lli ms, resulting in"
" replication delay of %lli ms. Defaulting replication delay to 0 ms.", timestamp, time_diff);
time_diff = 0;
}
repl_time_context.replication_delay_ms = time_diff;
return C_OK;
}

sds ReplTime_info(sds info_str) {
info_str = sdscatprintf(info_str,
"master_timestamp:%lld\r\n"
"replication_lag:%lld\r\n"
"master_timestamp_replication_delay_ms:%lld\r\n"
"master_timestamp_update_failure_count:%lld\r\n"
"lag_behind_master_timestamp_ms:%lld\r\n",
server.repl_time.latest_repl_master_timestamp,
server.repl_time.latest_repl_master_timestamp == 0 ? 0 : (mstime() - server.repl_time.latest_repl_master_timestamp),
repl_time_context.replication_delay_ms,
repl_time_context.latest_repl_master_timestamp_update_failure_count,
repl_time_context.lag_behind_latest_repl_master_timestamp_ms);
return info_str;
}
6 changes: 6 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2729,6 +2729,7 @@ void initServer(void) {
server.aof_last_write_status = C_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
server.repl_time.latest_repl_master_timestamp = 0;
server.last_sig_received = 0;

/* Initiate acl info struct */
Expand Down Expand Up @@ -5498,6 +5499,7 @@ void totalNumberOfStatefulKeys(unsigned long *blocking_keys, unsigned long *bloc
*watched_keys = wkeys;
}

extern ReplTimeContext repl_time_context;
/* Create the string returned by the INFO command. This is decoupled
* by the INFO command itself as we need to report the same information
* on memory corruption problems. */
Expand Down Expand Up @@ -5931,6 +5933,10 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
server.repl_good_slaves_count);
}

if (server.masterhost) {
info = ReplTime_info(info);
}

if (listLength(server.slaves)) {
int slaveid = 0;
listNode *ln;
Expand Down
27 changes: 27 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,15 @@ typedef enum childInfoType {
CHILD_INFO_TYPE_MODULE_COW_SIZE
} childInfoType;

typedef struct ReplTime_t {
int timestamp_enabled; /* True if the master is injecting timestamps into the replication stream */
/*
* Latest replicated timestamp as of the current replication offset of Redis, for both master and replica.
* It is used on replica to calculate replication lag.
*/
mstime_t latest_repl_master_timestamp;
} ReplTime_t;

struct redisServer {
/* General */
pid_t pid; /* Main process pid. */
Expand Down Expand Up @@ -1919,6 +1928,7 @@ struct redisServer {
* when it receives an error on the replication stream */
int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to
* persist writes to AOF. */
ReplTime_t repl_time; /* Replication time info */
/* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->master client structure. */
Expand Down Expand Up @@ -3786,4 +3796,21 @@ int iAmMaster(void);
#define STRINGIFY_(x) #x
#define STRINGIFY(x) STRINGIFY_(x)

/**
* Append the replication time information to the Redis info string.
* Given info_str the Redis info string, returns The Redis info string with repl time info.
*/
sds ReplTime_info(sds info_str);

int isManualFailoverOrPauseInProgress();

typedef struct ReplTimeContext {
mstime_t replication_delay_ms; /* Time taken to replicate timestamp from master to replica */
/* Metrics */
long long latest_repl_master_timestamp_update_failure_count; /* Number of times latest_repl_master_timestamp couldn't be updated b/c the
* master's local clock time was before latest_repl_master_timestamp */
mstime_t lag_behind_latest_repl_master_timestamp_ms; /* The difference between the latest_repl_master_timestamp and local clock time (LT) when
* LT < latest_repl_master_timestamp */
} ReplTimeContext;

#endif
34 changes: 34 additions & 0 deletions tests/integration/replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1454,3 +1454,37 @@ start_server {tags {"repl external:skip"}} {
}
}
}

start_server {tags {"repl lag info"}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$master config set repl-timestamp-enabled yes
start_server {} {
set slave [srv 0 client]
$slave config set repl-timestamp-enabled yes
$slave slaveof $master_host $master_port

test "Test replication info" {
# wait for replication to be in sync
wait_for_condition 50 100 {
[lindex [$slave role] 0] eq {slave} &&
[string match {*master_link_status:up*} [$slave info replication]]
} else {
fail "Can't turn the instance into a replica"
}
set info [$slave info replication]
set rep_delay [getInfoProperty $info master_timestamp_replication_delay_ms]
assert {$rep_delay >= 0}

$master sadd s foo
$master pexpire s 1
after 10
$master sadd s foo
assert_equal 1 [$master wait 1 0]

assert_equal "set" [$master type s]
assert_equal "set" [$slave type s]
}
}
}
3 changes: 3 additions & 0 deletions tests/support/server.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ proc start_server {options {code undefined}} {
dict set config $directive $arguments
}

# Turn off replication timestamp as it interferes with legacy tests
dict set config "repl-timestamp-enabled" "no"

# remove directives that are marked to be omitted
foreach directive $omit {
dict unset config $directive
Expand Down

0 comments on commit 680e234

Please sign in to comment.