Skip to content

Commit

Permalink
Added heartbeat to Mysql Binlog Listener connection (#173)
Browse files Browse the repository at this point in the history
Added heartbeat to mysql binlog listener connection
---------
Co-authored-by: Ralf Kistner <[email protected]>
  • Loading branch information
Rentacookie authored Jan 8, 2025
1 parent 48320b5 commit e25263c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/grumpy-cameras-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-mysql': patch
---

Added a heartbeat mechanism to the MySQL binlog listener replication connection to detect connection timeouts.
29 changes: 27 additions & 2 deletions modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,36 @@ AND table_type = 'BASE TABLE';`,
return;
}

logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`);
// Set a heartbeat interval for the Zongji replication connection
// Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown
// The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket.
await new Promise((resolve, reject) => {
zongji.connection.query(
// In nanoseconds, 10^9 = 1s
'set @master_heartbeat_period=28*1000000000',
function (error: any, results: any, fields: any) {
if (error) {
reject(error);
} else {
resolve(results);
}
}
);
});
logger.info('Successfully set up replication connection heartbeat...');

// The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat.
// The timeout here must be greater than the master_heartbeat_period.
const socket = zongji.connection._socket!;
socket.setTimeout(60_000, () => {
socket.destroy(new Error('Replication connection timeout.'));
});

logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`);
// Only listen for changes to tables in the sync rules
const includedTables = [...this.tableCache.values()].map((table) => table.table);
zongji.start({
// We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'],
excludeEvents: [],
includeSchema: { [this.defaultSchema]: includedTables },
Expand All @@ -492,7 +517,7 @@ AND table_type = 'BASE TABLE';`,
// Forever young
await new Promise<void>((resolve, reject) => {
zongji.on('error', (error) => {
logger.error('Error on Binlog listener:', error);
logger.error('Binlog listener error:', error);
zongji.stop();
queue.kill();
reject(error);
Expand Down
10 changes: 10 additions & 0 deletions modules/module-mysql/src/replication/zongji/zongji.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
declare module '@powersync/mysql-zongji' {
import { Socket } from 'net';

export type ZongjiOptions = {
host: string;
user: string;
Expand Down Expand Up @@ -108,7 +110,15 @@ declare module '@powersync/mysql-zongji' {

export type BinLogEvent = BinLogRotationEvent | BinLogGTIDLogEvent | BinLogXidEvent | BinLogMutationEvent;

// @vlasky/mysql Connection
export interface MySQLConnection {
_socket?: Socket;
/** There are other forms of this method as well - this is the most basic one. */
query(sql: string, callback: (error: any, results: any, fields: any) => void): void;
}

export default class ZongJi {
connection: MySQLConnection;
constructor(options: ZongjiOptions);

start(options: StartOptions): void;
Expand Down

0 comments on commit e25263c

Please sign in to comment.