Skip to content

Commit

Permalink
Improved physrep matching / keepalive
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Hannum <[email protected]>
  • Loading branch information
markhannum committed Nov 16, 2024
1 parent 32a39dd commit a546027
Show file tree
Hide file tree
Showing 15 changed files with 486 additions and 54 deletions.
242 changes: 217 additions & 25 deletions bdb/phys_rep_lsn.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ extern int gbl_physrep_debug;
int gbl_physrep_exit_on_invalid_logstream = 0;
int gbl_physrep_ignore_queues = 1;

LOG_INFO get_last_lsn(bdb_state_type *bdb_state)
static LOG_INFO get_lsn_internal(bdb_state_type *bdb_state, int flags)
{
int rc;

/* get db internals */
DB_LOGC *logc;
DBT logrec;
DB_LSN last_log_lsn;
DB_LSN log_lsn;
LOG_INFO log_info = {0};

rc = bdb_state->dbenv->log_cursor(bdb_state->dbenv, &logc, 0);
Expand All @@ -41,7 +41,7 @@ LOG_INFO get_last_lsn(bdb_state_type *bdb_state)
}
bzero(&logrec, sizeof(DBT));
logrec.flags = DB_DBT_MALLOC;
rc = logc->get(logc, &last_log_lsn, &logrec, DB_LAST);
rc = logc->get(logc, &log_lsn, &logrec, flags);
if (rc) {
physrep_logmsg(LOGMSG_ERROR, "%s: Can't get last log record rc %d\n", __func__,
rc);
Expand All @@ -50,12 +50,12 @@ LOG_INFO get_last_lsn(bdb_state_type *bdb_state)
}
#if 0
if (gbl_physrep_debug)
physrep_logmsg(LOGMSG_USER, "%s: LSN %u:%u\n", __func__, last_log_lsn.file,
last_log_lsn.offset);
physrep_logmsg(LOGMSG_USER, "%s: LSN %u:%u\n", __func__, log_lsn.file,
log_lsn.offset);
#endif

log_info.file = last_log_lsn.file;
log_info.offset = last_log_lsn.offset;
log_info.file = log_lsn.file;
log_info.offset = log_lsn.offset;
log_info.size = logrec.size;

if (logrec.data)
Expand All @@ -66,6 +66,16 @@ LOG_INFO get_last_lsn(bdb_state_type *bdb_state)
return log_info;
}

LOG_INFO get_last_lsn(bdb_state_type *bdb_state)
{
return get_lsn_internal(bdb_state, DB_LAST);
}

LOG_INFO get_first_lsn(bdb_state_type *bdb_state)
{
return get_lsn_internal(bdb_state, DB_FIRST);
}

int compare_log(DBT *logrec, void *blob, unsigned int blob_len)
{
int rc;
Expand Down Expand Up @@ -142,8 +152,18 @@ int find_log_timestamp(bdb_state_type *bdb_state, time_t time,
return 0;
}

static int in_parent_range(DB_LSN *matchable_lsn, DB_LSN *parent_highest, DB_LSN *parent_lowest)
{
if (!parent_highest || !parent_lowest) {
return 1;
}

return log_compare(matchable_lsn, parent_highest) <= 0 &&
log_compare(matchable_lsn, parent_lowest) >= 0;
}

static int get_next_matchable(DB_LOGC *logc, LOG_INFO *info, int check_current,
DBT *logrec)
DBT *logrec, DB_LSN *parent_highest, DB_LSN *parent_lowest)
{
int rc;
u_int32_t rectype;
Expand All @@ -162,9 +182,20 @@ static int get_next_matchable(DB_LOGC *logc, LOG_INFO *info, int check_current,
__func__, info->file, info->offset, rc);
return 1;
}

/* Punt if there's no possibility of matching */
if (parent_lowest && log_compare(&match_lsn, parent_lowest) < 0) {
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s: initial lsn {%u:%u} is below parent_lowest {%u:%u}\n",
__func__, match_lsn.file, match_lsn.offset, parent_lowest->file, parent_lowest->offset);
}
return 1;
}

LOGCOPY_32(&rectype, logrec->data);
normalize_rectype(&rectype);
if (matchable_log_type(rectype)) {

if (matchable_log_type(rectype) && in_parent_range(&match_lsn, parent_highest, parent_lowest)) {
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s: Initial rec {%u:%u} is matchable\n",
__func__, info->file, info->offset);
Expand All @@ -176,6 +207,7 @@ static int get_next_matchable(DB_LOGC *logc, LOG_INFO *info, int check_current,
}
}

int matchable = 0;
do {
rc = logc->get(logc, &match_lsn, logrec, DB_PREV);
if (rc) {
Expand All @@ -185,9 +217,21 @@ static int get_next_matchable(DB_LOGC *logc, LOG_INFO *info, int check_current,
logrec->data = NULL;
return 1;
}

/* Punt if there's no possibility of matching */
if (parent_lowest && log_compare(&match_lsn, parent_lowest) < 0) {
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s: lsn {%u:%u} is below parent_lowest {%u:%u}\n",
__func__, match_lsn.file, match_lsn.offset, parent_lowest->file, parent_lowest->offset);
}
return 1;
}


LOGCOPY_32(&rectype, logrec->data);
normalize_rectype(&rectype);
} while (!matchable_log_type(rectype));
matchable = (matchable_log_type(rectype) && in_parent_range(&match_lsn, parent_highest, parent_lowest));
} while (!matchable);

info->file = match_lsn.file;
info->offset = match_lsn.offset;
Expand All @@ -208,21 +252,18 @@ uint32_t get_next_offset(DB_ENV *dbenv, LOG_INFO log_info)
return log_info.offset + log_info.size + dbenv->get_log_header_size(dbenv);
}

int apply_log(DB_ENV *dbenv, unsigned int file, unsigned int offset,
int64_t rectype, void *blob, int blob_len)
int apply_log(DB_ENV *dbenv, unsigned int file, unsigned int offset, int64_t rectype, void *blob, int blob_len)
{
return dbenv->apply_log(dbenv, file, offset, rectype, blob, blob_len);
}

int truncate_log_lock(bdb_state_type *bdb_state, unsigned int file,
unsigned int offset, uint32_t flags)
int truncate_log_lock(bdb_state_type *bdb_state, unsigned int file, unsigned int offset, uint32_t flags)
{
extern int gbl_online_recovery;
char *msg = "truncate log";
int online = gbl_online_recovery;

if (flags &&
bdb_state->repinfo->master_host != bdb_state->repinfo->myhost) {
if (flags && bdb_state->repinfo->master_host != bdb_state->repinfo->myhost) {
return send_truncate_to_master(bdb_state, file, offset);
}

Expand All @@ -241,8 +282,123 @@ int truncate_log_lock(bdb_state_type *bdb_state, unsigned int file,
return 0;
}

LOG_INFO find_match_lsn(void *in_bdb_state, cdb2_hndl_tp *repl_db,
LOG_INFO start_info)
/* Limit how much a physical replicant will unwind */
int gbl_physrep_max_rollback = 0;

static inline int have_overlap(DB_LSN *parent_low, DB_LSN *parent_high, DB_LSN *my_low, DB_LSN *my_high)
{
if (parent_low->file > my_high->file || parent_high->file < my_low->file) {
return 0;
}

/* Technically 'correct', but parent_low shouldn't be mid-logfile */
if (parent_low->file == my_high->file && parent_low->offset > my_high->offset) {
return 0;
}

if (parent_high->file == my_low->file && parent_high->offset < my_low->offset) {
return 0;
}

int max_rollback = gbl_physrep_max_rollback;
if (max_rollback > 0 && my_high->file - parent_high->file > max_rollback) {
return 0;
}

return 1;
}

/* Exposed to process-message for testcase */
void have_overlap_check(const char *inplow, const char *inphigh, const char *inmylow, const char *inmyhigh)
{
DB_LSN plow = {0}, phigh = {0}, mylow = {0}, myhigh = {0};
int rc;

if ((rc = char_to_lsn(inplow, &plow.file, &plow.offset)) != 0) {
physrep_logmsg(LOGMSG_ERROR, "%s error parsing parent-low lsn: %s\n", __func__, inplow);
return;
}

if ((rc = char_to_lsn(inphigh, &phigh.file, &phigh.offset)) != 0) {
physrep_logmsg(LOGMSG_ERROR, "%s error parsing parent-high lsn: %s\n", __func__, inphigh);
return;
}

if ((rc = char_to_lsn(inmylow, &mylow.file, &mylow.offset)) != 0) {
physrep_logmsg(LOGMSG_ERROR, "%s error parsing my-low lsn: %s\n", __func__, inmylow);
return;
}

if ((rc = char_to_lsn(inmyhigh, &myhigh.file, &myhigh.offset)) != 0) {
physrep_logmsg(LOGMSG_ERROR, "%s error parsing my-high lsn: %s\n", __func__, inmyhigh);
return;
}

rc = have_overlap(&plow, &phigh, &mylow, &myhigh);
logmsg(LOGMSG_USER, "%s overlap source=%s/%s mine=%s/%s result=%d\n", __func__, inplow, inphigh, inmylow, inmyhigh,
rc);
}

static inline int find_my_range(DB_LOGC *logc, DB_LSN *lowest, DB_LSN *highest)
{
int rc;
DB_LSN rec_lsn;
DBT logrec;

bzero(&logrec, sizeof(DBT));
logrec.flags = DB_DBT_REALLOC;

rc = logc->get(logc, &rec_lsn, &logrec, DB_FIRST);
if (rc) {
physrep_logmsg(LOGMSG_ERROR, "%s: Can't get first log record rc %d\n", __func__, rc);
return 0;
}

lowest->file = rec_lsn.file;
lowest->offset = rec_lsn.offset;

rc = logc->get(logc, &rec_lsn, &logrec, DB_LAST);
if (rc) {
physrep_logmsg(LOGMSG_ERROR, "%s: Can't get last log record rc %d\n", __func__, rc);
free(logrec.data);
return 0;
}

highest->file = rec_lsn.file;
highest->offset = rec_lsn.offset;

free(logrec.data);
return 1;
}

static inline int find_parent_range(cdb2_hndl_tp *repl_db, DB_LSN *lowest, DB_LSN *highest)
{
char *sql_cmd = "select * from "
"(select lsn from comdb2_transaction_logs('{0:0}','{0:0}', 0x0) limit 1),"
"(select lsn from comdb2_transaction_logs('{0:0}','{0:0}', 0x4) limit 1)";

int rc;

if ((rc = cdb2_run_statement(repl_db, sql_cmd)) == 0) {
if ((rc = cdb2_next_record(repl_db)) == CDB2_OK) {
char *lowest_lsn = (char *)cdb2_column_value(repl_db, 0);
char *highest_lsn = (char *)cdb2_column_value(repl_db, 1);

if ((rc = char_to_lsn(lowest_lsn, &lowest->file, &lowest->offset)) != 0) {
physrep_logmsg(LOGMSG_ERROR, "%s error parsing lsn: %s\n", __func__, lowest_lsn);
return 0;
}
if ((rc = char_to_lsn(highest_lsn, &highest->file, &highest->offset)) != 0) {
physrep_logmsg(LOGMSG_ERROR, "%s error parsing lsn: %s\n", __func__, highest_lsn);
return 0;
}
return 1;
}
}
return 0;
}

LOG_INFO find_match_lsn(void *in_bdb_state, cdb2_hndl_tp *repl_db, LOG_INFO start_info)
{
int rc;
char sql_cmd[128];
Expand All @@ -255,6 +411,8 @@ LOG_INFO find_match_lsn(void *in_bdb_state, cdb2_hndl_tp *repl_db,
LOG_INFO info = {0};
DB_LOGC *logc;
DBT logrec = {0};
DB_LSN parent_lowest = {0}, parent_highest = {0};
int have_parent_range = find_parent_range(repl_db, &parent_lowest, &parent_highest);

rc = bdb_state->dbenv->log_cursor(bdb_state->dbenv, &logc, 0);
if (rc) {
Expand All @@ -264,14 +422,48 @@ LOG_INFO find_match_lsn(void *in_bdb_state, cdb2_hndl_tp *repl_db,

logrec.flags = DB_DBT_REALLOC;

while (
!(rc = get_next_matchable(logc, &start_info, match_current, &logrec))) {
DB_LSN my_lowest = {0}, my_highest = {0};
int have_my_range = find_my_range(logc, &my_lowest, &my_highest);

if (gbl_physrep_debug) {
logmsg(LOGMSG_INFO, "%s: parent-range: %d:%d - %d:%d, my-range: %d:%d - %d:%d\n", __func__, parent_lowest.file,
parent_lowest.offset, parent_highest.file, parent_highest.offset, my_lowest.file, my_lowest.offset,
my_highest.file, my_highest.offset);
}

if (have_my_range && have_parent_range && !have_overlap(&parent_lowest, &parent_highest, &my_lowest, &my_highest)) {
physrep_logmsg(LOGMSG_WARN,
"%s: No overlap: parent_lowest {%d:%d}, "
"parent_highest {%d:%d}, my_lowest {%d:%d}, my_highest {%d:%d}\n",
__func__, parent_lowest.file, parent_lowest.offset, parent_highest.file, parent_highest.offset,
my_lowest.file, my_lowest.offset, my_highest.file, my_highest.offset);

logc->close(logc, 0);
return info;
}

while (!(rc = get_next_matchable(logc, &start_info, match_current, &logrec,
have_parent_range ? &parent_highest : NULL, &parent_lowest))) {
if (have_parent_range && (start_info.file < parent_lowest.file)) {
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s: {%d:%d} is below parent_lowest {%d:%d}\n", __func__, start_info.file,
start_info.offset, parent_lowest.file, parent_lowest.offset);
}

physrep_logmsg(LOGMSG_WARN, "Parent logs of range: %d:%d - %d:%d, matchable = %d:%d\n", parent_lowest.file,
parent_lowest.offset, parent_highest.file, parent_highest.offset, start_info.file,
start_info.offset);
logc->close(logc, 0);
if (logrec.data) {
free(logrec.data);
logrec.data = NULL;
}
return info;
}

match_current = 0;
snprintf(
sql_cmd, sizeof(sql_cmd),
"select * from comdb2_transaction_logs('{%d:%d}','{%d:%d}', 0)",
start_info.file, start_info.offset, start_info.file,
start_info.offset);
snprintf(sql_cmd, sizeof(sql_cmd), "select * from comdb2_transaction_logs('{%d:%d}','{%d:%d}', 0)",
start_info.file, start_info.offset, start_info.file, start_info.offset);

if ((rc = cdb2_run_statement(repl_db, sql_cmd)) == 0) {
if ((rc = cdb2_next_record(repl_db)) == CDB2_OK) {
Expand Down
1 change: 1 addition & 0 deletions bdb/phys_rep_lsn.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ int physrep_ignore_table_count(void);
int physrep_list_ignored_tables(void);

LOG_INFO get_last_lsn(struct bdb_state_tag *);
LOG_INFO get_first_lsn(struct bdb_state_tag *);
uint32_t get_next_offset(struct __db_env *, LOG_INFO log_info);
int apply_log(struct __db_env *, unsigned int file, unsigned int offset,
int64_t rectype, void *blob, int blob_len);
Expand Down
3 changes: 3 additions & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -473,13 +473,16 @@ extern int gbl_physrep_hung_replicant_threshold;
extern int gbl_physrep_revconn_check_interval;
extern int gbl_physrep_update_registry_interval;
extern int gbl_physrep_i_am_metadb;
extern int gbl_physrep_keepalive_v2;
extern int gbl_physrep_keepalive_freq_sec;
extern int gbl_physrep_max_candidates;
extern int gbl_physrep_test_fallback;
extern int gbl_physrep_max_pending_replicants;
extern int gbl_physrep_reconnect_penalty;
extern int gbl_physrep_register_interval;
extern int gbl_physrep_shuffle_host_list;
extern int gbl_physrep_ignore_queues;
extern int gbl_physrep_max_rollback;

/* source-name / host is from lrl */
extern char *gbl_physrep_source_dbname;
Expand Down
Loading

0 comments on commit a546027

Please sign in to comment.