Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop inhouse conver to pgoutput encoding and move to native #47

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/scripts/e2e_test_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ make build

setup_docker

log "Running e2e ddl tests..."
if CI=false ./internal/scripts/e2e_ddl.sh; then
success "e2e ddl tests completed successfully"
log "Running copy and stream tests..."
if CI=false ./internal/scripts/e2e_copy_and_stream.sh; then
success "e2e copy and stream tests completed successfully"
else
error "Original e2e tests failed"
exit 1
Expand Down
66 changes: 56 additions & 10 deletions pkg/replicator/base_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,25 @@ func (r *BaseReplicator) HandleInsertMessage(msg *pglogrepl.InsertMessage, lsn p
}

cdcMessage := utils.CDCMessage{
Type: "INSERT",
Type: utils.OperationInsert,
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
NewValues: make(map[string]*utils.PostgresValue),
EmittedAt: time.Now(),
NewTuple: msg.Tuple,
LSN: lsn.String(),
}

// Convert tuple data to PostgresValue
for i, col := range relation.Columns {
val := msg.Tuple.Columns[i]
pgValue, err := utils.NewPostgresValue(col.DataType, val.Data)
if err != nil {
return fmt.Errorf("failed to convert column %s: %w", col.Name, err)
}
cdcMessage.NewValues[col.Name] = pgValue
}

r.AddPrimaryKeyInfo(&cdcMessage, relation.RelationName)
return r.PublishToNATS(cdcMessage)
}
Expand All @@ -337,19 +347,38 @@ func (r *BaseReplicator) HandleUpdateMessage(msg *pglogrepl.UpdateMessage, lsn p
}

cdcMessage := utils.CDCMessage{
Type: "UPDATE",
Type: utils.OperationUpdate,
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
NewTuple: msg.NewTuple,
OldTuple: msg.OldTuple,
NewValues: make(map[string]*utils.PostgresValue),
OldValues: make(map[string]*utils.PostgresValue),
LSN: lsn.String(),
EmittedAt: time.Now(),
ToastedColumns: make(map[string]bool),
}

// Convert new tuple data
for i, col := range relation.Columns {
newVal := msg.NewTuple.Columns[i]
cdcMessage.ToastedColumns[col.Name] = newVal.DataType == 'u'

if !cdcMessage.ToastedColumns[col.Name] {
pgValue, err := utils.NewPostgresValue(col.DataType, newVal.Data)
if err != nil {
return fmt.Errorf("failed to convert new value for column %s: %w", col.Name, err)
}
cdcMessage.NewValues[col.Name] = pgValue
}

if msg.OldTuple != nil {
oldVal := msg.OldTuple.Columns[i]
pgValue, err := utils.NewPostgresValue(col.DataType, oldVal.Data)
if err != nil {
return fmt.Errorf("failed to convert old value for column %s: %w", col.Name, err)
}
cdcMessage.OldValues[col.Name] = pgValue
}
}

r.AddPrimaryKeyInfo(&cdcMessage, relation.RelationName)
Expand All @@ -364,15 +393,25 @@ func (r *BaseReplicator) HandleDeleteMessage(msg *pglogrepl.DeleteMessage, lsn p
}

cdcMessage := utils.CDCMessage{
Type: "DELETE",
Type: utils.OperationDelete,
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
OldTuple: msg.OldTuple,
OldValues: make(map[string]*utils.PostgresValue),
EmittedAt: time.Now(),
LSN: lsn.String(),
}

// Convert old tuple data
for i, col := range relation.Columns {
oldVal := msg.OldTuple.Columns[i]
pgValue, err := utils.NewPostgresValue(col.DataType, oldVal.Data)
if err != nil {
return fmt.Errorf("failed to convert column %s: %w", col.Name, err)
}
cdcMessage.OldValues[col.Name] = pgValue
}

r.AddPrimaryKeyInfo(&cdcMessage, relation.RelationName)
return r.PublishToNATS(cdcMessage)
}
Expand All @@ -391,18 +430,25 @@ func (r *BaseReplicator) HandleCommitMessage(msg *pglogrepl.CommitMessage) error

// PublishToNATS publishes a message to NATS
func (r *BaseReplicator) PublishToNATS(data utils.CDCMessage) error {
binaryData, err := data.MarshalBinary()
// Validate operation type
if _, err := utils.ValidateOperationType(data.Type); err != nil {
return fmt.Errorf("invalid operation type: %w", err)
}

bytes, err := data.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal data: %w", err)
return fmt.Errorf("failed to marshal message: %w", err)
}

subject := fmt.Sprintf("pgflo.%s", r.Config.Group)
err = r.NATSClient.PublishMessage(subject, binaryData)
err = r.NATSClient.PublishMessage(subject, bytes)
if err != nil {
r.Logger.Error().
Err(err).
Str("subject", subject).
Str("group", r.Config.Group).
Str("operation", string(data.Type)).
Str("table", data.Table).
Msg("Failed to publish message to NATS")
return err
}
Expand Down
27 changes: 10 additions & 17 deletions pkg/replicator/copy_and_stream_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,30 +354,23 @@ func (r *CopyAndStreamReplicator) executeCopyQuery(ctx context.Context, tx pgx.T
return 0, fmt.Errorf("error reading row: %v", err)
}

tupleData := &pglogrepl.TupleData{
Columns: make([]*pglogrepl.TupleDataColumn, len(values)),
}
for i, value := range values {
data, err := utils.ConvertToPgCompatibleOutput(value, fieldDescriptions[i].DataTypeOID)
if err != nil {
return 0, fmt.Errorf("error converting value: %v", err)
}

tupleData.Columns[i] = &pglogrepl.TupleDataColumn{
DataType: uint8(fieldDescriptions[i].DataTypeOID),
Data: data,
}
}

cdcMessage := utils.CDCMessage{
Type: "INSERT",
Type: utils.OperationInsert,
Schema: schema,
Table: tableName,
Columns: columns,
NewTuple: tupleData,
NewValues: make(map[string]*utils.PostgresValue),
EmittedAt: time.Now(),
}

for i, value := range values {
pgValue, err := utils.NewPostgresValue(fieldDescriptions[i].DataTypeOID, value)
if err != nil {
return 0, fmt.Errorf("error converting value: %v", err)
}
cdcMessage.NewValues[fieldDescriptions[i].Name] = pgValue
}

r.BaseReplicator.AddPrimaryKeyInfo(&cdcMessage, tableName)
if err := r.BaseReplicator.PublishToNATS(cdcMessage); err != nil {
return 0, fmt.Errorf("failed to publish insert event to NATS: %v", err)
Expand Down
49 changes: 31 additions & 18 deletions pkg/replicator/ddl_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ func (d *DDLReplicator) ProcessDDLEvents(ctx context.Context) error {
var processedIDs []int
seenCommands := make(map[string]bool)

// Define DDL columns structure
ddlColumns := []*pglogrepl.RelationMessageColumn{
{Name: "event_type", DataType: pgtype.TextOID},
{Name: "object_type", DataType: pgtype.TextOID},
{Name: "object_identity", DataType: pgtype.TextOID},
{Name: "ddl_command", DataType: pgtype.TextOID},
{Name: "created_at", DataType: pgtype.TextOID},
}

for rows.Next() {
var id int
var eventType, objectType, objectIdentity, ddlCommand string
Expand Down Expand Up @@ -195,30 +204,34 @@ func (d *DDLReplicator) ProcessDDLEvents(ctx context.Context) error {
}

cdcMessage := utils.CDCMessage{
Type: "DDL",
Type: utils.OperationDDL,
Schema: schema,
Table: table,
EmittedAt: time.Now(),
Columns: []*pglogrepl.RelationMessageColumn{
{Name: "event_type", DataType: pgtype.TextOID},
{Name: "object_type", DataType: pgtype.TextOID},
{Name: "object_identity", DataType: pgtype.TextOID},
{Name: "ddl_command", DataType: pgtype.TextOID},
{Name: "created_at", DataType: pgtype.TimestamptzOID},
},
NewTuple: &pglogrepl.TupleData{
Columns: []*pglogrepl.TupleDataColumn{
{Data: []byte(eventType)},
{Data: []byte(objectType)},
{Data: []byte(objectIdentity)},
{Data: []byte(ddlCommand)},
{Data: []byte(createdAt.Format(time.RFC3339))},
},
},
Columns: ddlColumns,
NewValues: make(map[string]*utils.PostgresValue),
}

values := map[string]interface{}{
"event_type": eventType,
"object_type": objectType,
"object_identity": objectIdentity,
"ddl_command": ddlCommand,
"created_at": createdAt.Format(time.RFC3339Nano),
}

for name, value := range values {
if err := cdcMessage.SetColumnValue(name, value); err != nil {
d.BaseRepl.Logger.Error().Err(err).
Str("field", name).
Msg("Failed to convert DDL value")
return err
}
}

if err := d.BaseRepl.PublishToNATS(cdcMessage); err != nil {
d.BaseRepl.Logger.Error().Err(err).Msg("Error during publishing DDL event to NATS")
d.BaseRepl.Logger.Error().Err(err).
Msg("Error during publishing DDL event to NATS")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/rules/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (re *RuleEngine) ApplyRules(message *utils.CDCMessage) (*utils.CDCMessage,

logger.Info().
Str("table", message.Table).
Str("operation", message.Type).
Str("operation", string(message.Type)).
Int("ruleCount", len(rules)).
Msg("Applying rules")

Expand Down
8 changes: 4 additions & 4 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewRegexTransformRule(table, column string, params map[string]interface{})
}

transform := func(m *utils.CDCMessage) (*utils.CDCMessage, error) {
value, err := m.GetColumnValue(column)
value, err := m.GetDecodedColumnValue(column)
if err != nil {
return m, nil
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func NewMaskTransformRule(table, column string, params map[string]interface{}) (
}

transform := func(m *utils.CDCMessage) (*utils.CDCMessage, error) {
value, err := m.GetColumnValue(column)
value, err := m.GetDecodedColumnValue(column)
if err != nil {
return m, nil
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func NewFilterRule(table, column string, params map[string]interface{}) (Rule, e
// NewComparisonCondition creates a new comparison condition function
func NewComparisonCondition(column, operator string, value interface{}) func(*utils.CDCMessage) bool {
return func(m *utils.CDCMessage) bool {
columnValue, err := m.GetColumnValue(column)
columnValue, err := m.GetDecodedColumnValue(column)
if err != nil {
return false
}
Expand Down Expand Up @@ -261,7 +261,7 @@ func NewComparisonCondition(column, operator string, value interface{}) func(*ut
// NewContainsCondition creates a new contains condition function
func NewContainsCondition(column string, value interface{}) func(*utils.CDCMessage) bool {
return func(m *utils.CDCMessage) bool {
columnValue, err := m.GetColumnValue(column)
columnValue, err := m.GetDecodedColumnValue(column)
if err != nil {
return false
}
Expand Down
Loading
Loading