Skip to content

Commit

Permalink
Discard non existent attempts on DDL (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
shayonj authored Nov 12, 2024
1 parent 27ea37c commit c297a02
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions pkg/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ func (s *PostgresSink) handleDelete(tx pgx.Tx, message *utils.CDCMessage) error

// handleDDL processes a DDL operation
func (s *PostgresSink) handleDDL(tx pgx.Tx, message *utils.CDCMessage) (pgx.Tx, error) {
if s.conn == nil || s.conn.IsClosed() {
if err := s.connect(context.Background()); err != nil {
return nil, fmt.Errorf("failed to reconnect to database: %v", err)
}
newTx, err := s.conn.Begin(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to begin new transaction: %v", err)
}
tx = newTx
}

ddlCommand, err := message.GetColumnValue("ddl_command")
if err != nil {
return tx, fmt.Errorf("failed to get DDL command: %v", err)
Expand All @@ -229,18 +240,22 @@ func (s *PostgresSink) handleDDL(tx pgx.Tx, message *utils.CDCMessage) (pgx.Tx,
}

if _, err := s.conn.Exec(context.Background(), ddlString); err != nil {
if strings.Contains(err.Error(), "does not exist") {
log.Warn().Msgf("Ignoring DDL for non-existent object: %s", ddlString)
return s.conn.Begin(context.Background())
}
return nil, fmt.Errorf("failed to execute concurrent DDL: %v", err)
}

newTx, err := s.conn.Begin(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to begin new transaction after concurrent DDL: %v", err)
}
return newTx, nil
return s.conn.Begin(context.Background())
}

_, err = tx.Exec(context.Background(), ddlString)
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
log.Warn().Msgf("Ignoring DDL for non-existent object: %s", ddlString)
return tx, nil
}
return tx, fmt.Errorf("failed to execute DDL: %v", err)
}

Expand Down

0 comments on commit c297a02

Please sign in to comment.