Skip to content

Commit

Permalink
chore: support pitr for mysql and apecloud-mysql (#1451)
Browse files Browse the repository at this point in the history
Co-authored-by: wangyelei <[email protected]>
  • Loading branch information
wangyelei and wangyelei authored Feb 7, 2025
1 parent 7022699 commit c32df6c
Show file tree
Hide file tree
Showing 13 changed files with 1,020 additions and 2 deletions.
159 changes: 159 additions & 0 deletions addons/apecloud-mysql/dataprotection/apecloud-mysql-pitr-backup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#!/bin/bash
# export wal-g environments
export WALG_MYSQL_DATASOURCE_NAME="${DP_DB_USER}:${DP_DB_PASSWORD}@tcp(${DP_DB_HOST}:${DP_DB_PORT})/mysql"
export WALG_COMPRESSION_METHOD=zstd
# use datasafed and default config
export WALG_DATASAFED_CONFIG=""
export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
export WALG_MYSQL_CHECK_GTIDS=true
export MYSQL_PWD=${DP_DB_PASSWORD}
# work directory to save necessary file for backup
export KB_BACKUP_WORKDIR=${VOLUME_DATA_DIR}/kb-backup

# get binlog basename
MYSQL_CMD="mysql -u ${DP_DB_USER} -h ${DP_DB_HOST} -N"
log_bin_basename=$(${MYSQL_CMD} -e "SHOW VARIABLES LIKE 'log_bin_basename';" | awk -F'\t' '{print $2}')
if [ -z ${log_bin_basename} ]; then
echo "ERROR: pod/${DP_TARGET_POD_NAME} connect failed."
exit 1
fi
LOG_DIR=$(dirname $log_bin_basename)
LOG_PREFIX=$(basename $log_bin_basename)

global_latest_bin_log=""
global_last_flush_logs_time=$(date +%s)
global_last_purge_time=$(date +%s)
global_old_size=0
global_flush_bin_logs_interval=600

if [[ ${DP_ARCHIVE_INTERVAL} =~ ^[0-9]+s$ ]];then
global_flush_bin_logs_interval=${DP_ARCHIVE_INTERVAL%s}
fi

# checks if the mysql process is ok
function check_mysql_process() {
is_ok=false
for ((i=1;i<4;i++));do
role=$(${MYSQL_CMD} -e "select role from information_schema.wesql_cluster_local;" | head -n 1)
if [[ $? -eq 0 && (-z ${TARGET_POD_ROLE} || "${TARGET_POD_ROLE,,}" == "${role,,}") ]]; then
is_ok=true
break
fi
DP_error_log "target backup pod/${DP_TARGET_POD_NAME} is not OK, target role: ${TARGET_POD_ROLE}, current role: ${role}, retry detection!"
sleep 1
done
if [[ ${is_ok} == "false" ]];then
DP_error_log "target backup pod/${DP_TARGET_POD_NAME} is not OK, target role: ${TARGET_POD_ROLE}, current role: ${role}!"
exit 1
fi
}

# clean up expired logfiles, interval is 60s
function purge_expired_files() {
export DATASAFED_BACKEND_BASE_PATH="$DP_BACKUP_BASE_PATH"
local currentUnix=$(date +%s)
info=$(DP_purge_expired_files ${currentUnix} ${global_last_purge_time})
if [ ! -z "${info}" ]; then
global_last_purge_time=${currentUnix}
DP_log "cleanup expired binlog files: ${info}"
local TOTAL_SIZE=$(datasafed stat / | grep TotalSize | awk '{print $2}')
DP_save_backup_status_info "${TOTAL_SIZE}"
fi
}

# flush bin logs, interval is 600s by default
function flush_binlogs() {
local binlog=$(ls -Ft ${LOG_DIR}/|grep -e "^${LOG_PREFIX}.*[[:digit:]]$" |head -n 1)
if [ -z ${binlog} ]; then
return
fi
local curr_time=$(date +%s)
# if size greater than FLUSH_BINLOG_AFTER_SIZE, will flush binary logs.
if [ $(stat -c%s ${LOG_DIR}/${binlog}) -gt ${FLUSH_BINLOG_AFTER_SIZE} ]; then
DP_log "flush binary logs"
${MYSQL_CMD} -e "flush binary logs";
global_last_flush_logs_time=${curr_time}
return
fi
local diff_time=$((${curr_time}-${global_last_flush_logs_time}))
if [[ ${diff_time} -lt ${global_flush_bin_logs_interval} ]]; then
return
fi
local LATEST_TRANS=$(mysqlbinlog ${LOG_DIR}/${binlog} |grep 'Xid =' |head -n 1)
# only flush bin logs when Xid exists
if [[ -n "${LATEST_TRANS}" ]]; then
DP_log "flush binary logs"
${MYSQL_CMD} -e "flush binary logs";
fi
global_last_flush_logs_time=${curr_time}
}

# upload bin logs by walg
function upload_bin_logs() {
export DATASAFED_BACKEND_BASE_PATH="$DP_BACKUP_BASE_PATH/${DP_TARGET_POD_NAME}"
global_latest_bin_log=$(ls -Ftr ${LOG_DIR}/|grep -e "^${LOG_PREFIX}.*[[:digit:]]$"|tail -n 1)
if [ ! -z ${global_latest_bin_log} ];then
global_latest_bin_log="${LOG_DIR}/${global_latest_bin_log}"
fi
wal-g binlog-push;
}

# get binlog start time
function get_binlog_start_time() {
local binlog="${1:?missing binlog name}"
local time=$(mysqlbinlog ${binlog} | grep -m 1 "end_log_pos" | awk '{print $1, $2}'|tr -d '#')
local time=$(date -d "$time" -u '+%Y-%m-%dT%H:%M:%SZ')
echo $time
}

# pull binlog and decompress
function pull_binlog() {
file="${1:?missing file name}"
fileName=$(basename ${file})
datasafed pull ${file} ${fileName}
zstd -d --rm ${fileName}
}

# get the start time for backup.status.timeRange
function get_start_time_for_range() {
local oldest_bin_log=$(datasafed list -f --recursive / -o json | jq -s -r '.[] | sort_by(.mtime) | .[] | .path' | grep .zst | head -n 1)
if [ ! -z ${oldest_bin_log} ]; then
START_TIME=$(DP_analyze_start_time_from_datasafed "${oldest_bin_log}" get_binlog_start_time pull_binlog)
echo ${START_TIME}
fi
}

# save backup status info to sync file
function save_backup_status() {
export DATASAFED_BACKEND_BASE_PATH="$DP_BACKUP_BASE_PATH"
local TOTAL_SIZE=$(datasafed stat / | grep TotalSize | awk '{print $2}')
# if no size changes, return
if [[ ${TOTAL_SIZE} == ${global_old_size} ]];then
return
fi
global_old_size=${TOTAL_SIZE}
local START_TIME=$(get_start_time_for_range)
local STOP_TIME=$(get_binlog_start_time ${global_latest_bin_log})
DP_save_backup_status_info "${TOTAL_SIZE}" "${START_TIME}" "${STOP_TIME}"
}

# trap term signal
trap "echo 'Terminating...' && sync && exit 0" TERM
DP_log "start to archive binlog"
while true; do
# check if mysql process is ok
check_mysql_process

# flush bin logs
flush_binlogs

# upload bin log
upload_bin_logs

# save backup status which will be updated to `backup` CR by the sidecar
save_backup_status

# purge the expired bin logs
purge_expired_files
sleep ${BINLOG_ARCHIVE_INTERVAL}
done
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash
set -e;

# use datasafed and default config
export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
export DATASAFED_BACKEND_BASE_PATH="$DP_BACKUP_BASE_PATH"

baseBackupStartTimestamp=${DP_BASE_BACKUP_START_TIMESTAMP}
if [ -f $DATA_DIR/xtrabackup_info ]; then
DP_BASE_BACKUP_START_TIME=$(cat $DATA_DIR/xtrabackup_info | grep start_time | awk -F ' = ' '{print $2}');
baseBackupStartTimestamp=$(date -d"${DP_BASE_BACKUP_START_TIME}" +%s)
fi
log_index_name="archive_log.index"

function fetch_pitr_binlogs() {
echo "INFO: fetch binlogs from ${DP_BASE_BACKUP_START_TIME}"
for file in $(datasafed list -f --recursive --newer-than ${baseBackupStartTimestamp} / -o json | jq -s -r '.[] | sort_by(.mtime) | .[] | .path' | grep .zst);do
file_without_zst=${file%.*}
dir_path=`dirname ${file_without_zst}`
# mkdir the log directory
mkdir -p ${PITR_DIR}/${dir_path}
datasafed pull ${file} - | zstd -d -o ${PITR_DIR}/${file_without_zst}
echo "${PITR_RELATIVE_PATH}/${file_without_zst}" >> ${PITR_DIR}/${log_index_name}
# check if the binlog file contains the data for recovery time
log_start_time=$(mysqlbinlog ${PITR_DIR}/${file_without_zst} | grep -m 1 "end_log_pos" | awk '{print $1, $2}'|tr -d '#')
log_start_timestamp=$(date -d "${log_start_time}" +%s)
if [[ ${log_start_timestamp} -gt ${DP_RESTORE_TIMESTAMP} ]];then
DP_log "${file} out of range ${DP_RESTORE_TIME}"
break
fi
done
}

function save_to_restore_file() {
if [ -f ${DATA_DIR}/.xtrabackup_restore_new_cluster ];then
restore_signal_file=${DATA_DIR}/.xtrabackup_restore_new_cluster
else
restore_signal_file=${DATA_DIR}/.restore_new_cluster
fi
echo "archive_log_index=${PITR_RELATIVE_PATH}/${log_index_name}" > ${restore_signal_file}
kb_recover_time=$(date -d "${DP_RESTORE_TIME}" -u '+%Y-%m-%d %H:%M:%S')
echo "recovery_target_datetime=${kb_recover_time}" >> ${restore_signal_file}
sync
}

fetch_pitr_binlogs

if [ -f ${PITR_DIR}/${log_index_name} ];then
save_to_restore_file
DP_log "fetch binlog finished."
else
DP_log "didn't get any binlogs."
fi
120 changes: 120 additions & 0 deletions addons/apecloud-mysql/dataprotection/common-scripts.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/bin/bash
# log info file
function DP_log() {
msg=$1
local curr_date=$(date -u '+%Y-%m-%d %H:%M:%S')
echo "${curr_date} INFO: $msg"
}

# log error info
function DP_error_log() {
msg=$1
local curr_date=$(date -u '+%Y-%m-%d %H:%M:%S')
echo "${curr_date} ERROR: $msg"
}

# Get file names without extensions based on the incoming file path
function DP_get_file_name_without_ext() {
local fileName=$1
local file_without_ext=${fileName%.*}
echo $(basename ${file_without_ext})
}

# Save backup status info file for syncing progress.
# timeFormat: %Y-%m-%dT%H:%M:%SZ
function DP_save_backup_status_info() {
local totalSize=$1
local startTime=$2
local stopTime=$3
local timeZone=$4
local extras=$5
local timeZoneStr=""
if [ ! -z ${timeZone} ]; then
timeZoneStr=",\"timeZone\":\"${timeZone}\""
fi
if [ -z "${stopTime}" ];then
echo "{\"totalSize\":\"${totalSize}\"}" > ${DP_BACKUP_INFO_FILE}
elif [ -z "${startTime}" ];then
echo "{\"totalSize\":\"${totalSize}\",\"extras\":[${extras}],\"timeRange\":{\"end\":\"${stopTime}\"${timeZoneStr}}}" > ${DP_BACKUP_INFO_FILE}
else
echo "{\"totalSize\":\"${totalSize}\",\"extras\":[${extras}],\"timeRange\":{\"start\":\"${startTime}\",\"end\":\"${stopTime}\"${timeZoneStr}}}" > ${DP_BACKUP_INFO_FILE}
fi
}


# Clean up expired logfiles.
# Default interval is 60s
# Default rootPath is /
function DP_purge_expired_files() {
local currentUnix="${1:?missing current unix}"
local last_purge_time="${2:?missing last_purge_time}"
local root_path=${3:-"/"}
local interval_seconds=${4:-60}
local diff_time=$((${currentUnix}-${last_purge_time}))
if [[ -z ${DP_TTL_SECONDS} || ${diff_time} -lt ${interval_seconds} ]]; then
return
fi
expiredUnix=$((${currentUnix}-${DP_TTL_SECONDS}))
files=$(datasafed list -f --recursive --older-than ${expiredUnix} ${root_path} )
for file in ${files}
do
datasafed rm ${file}
echo ${file}
done
}

# analyze the start time of the earliest file from the datasafed backend.
# Then record the file name into dp_oldest_file.info.
# If the oldest file is no changed, exit the process.
# This can save traffic consumption.
function DP_analyze_start_time_from_datasafed() {
local oldest_file="${1:?missing oldest file}"
local get_start_time_from_file="${2:?missing get_start_time_from_file function}"
local datasafed_pull="${3:?missing datasafed_pull function}"
local info_file="${KB_BACKUP_WORKDIR}/dp_oldest_file.info"
mkdir -p ${KB_BACKUP_WORKDIR} && cd ${KB_BACKUP_WORKDIR}
if [ -f ${info_file} ]; then
last_oldest_file=$(cat ${info_file})
last_oldest_file_name=$(DP_get_file_name_without_ext ${last_oldest_file})
if [ "$last_oldest_file" == "${oldest_file}" ]; then
# oldest file no changed.
${get_start_time_from_file} $last_oldest_file_name
return
fi
# remove last oldest file
if [ -f ${last_oldest_file_name} ];then
rm -rf ${last_oldest_file_name}
fi
fi
# pull file
${datasafed_pull} ${oldest_file}
# record last oldest file
echo ${oldest_file} > ${info_file}
oldest_file_name=$(DP_get_file_name_without_ext ${oldest_file})
${get_start_time_from_file} ${oldest_file_name}
}

# get the timeZone offset for location, such as Asia/Shanghai
function getTimeZoneOffset() {
local timeZone=${1:?missing time zone}
if [[ $timeZone == "+"* ]] || [[ $timeZone == "-"* ]] ; then
echo ${timeZone}
return
fi
local currTime=$(TZ=UTC date)
local utcHour=$(TZ=UTC date -d "${currTime}" +"%H")
local zoneHour=$(TZ=${timeZone} date -d "${currTime}" +"%H")
local offset=$((${zoneHour}-${utcHour}))
if [ $offset -eq 0 ]; then
return
fi
symbol="+"
if [ $offset -lt 0 ]; then
symbol="-" && offset=${offset:1}
fi
if [ $offset -lt 10 ];then
offset="0${offset}"
fi
echo "${symbol}${offset}:00"
}

Loading

0 comments on commit c32df6c

Please sign in to comment.