From 0dcc4eb2fa18c511a05394593dfa5a8508af062b Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Tue, 15 Oct 2024 21:51:28 +0000 Subject: [PATCH] handle conn closed in heartbeats --- pkg/replicator/pgreplicator/pg.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/replicator/pgreplicator/pg.go b/pkg/replicator/pgreplicator/pg.go index d8d13a7..9df29b0 100644 --- a/pkg/replicator/pgreplicator/pg.go +++ b/pkg/replicator/pgreplicator/pg.go @@ -253,7 +253,7 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error { t := time.NewTicker(p.heartbeatTime) for range t.C { - if ctx.Err() != nil { + if ctx.Err() != nil || p.queryConn.IsClosed() { return } @@ -262,6 +262,10 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error { _, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);") p.queryLock.Unlock() + if isConnClosedErr(err) && p.queryConn.IsClosed() { + return + } + if err != nil { p.log.Warn("unable to emit heartbeat", "error", err, "host", p.opts.Config.Host) } @@ -269,7 +273,7 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error { }() for { - if ctx.Err() != nil || atomic.LoadInt32(&p.stopped) == 1 { + if ctx.Err() != nil || atomic.LoadInt32(&p.stopped) == 1 || p.conn.IsClosed() { // Always call Close automatically. p.Close(ctx) return nil @@ -501,3 +505,7 @@ func standardizeErr(err error) (bool, error) { } return false, err } + +func isConnClosedErr(err error) bool { + return err != nil && strings.Contains(err.Error(), "conn closed") +}