From d7acfa9ad97683c0019d700774107ea40ca8a037 Mon Sep 17 00:00:00 2001 From: lyazii22 Date: Fri, 11 Nov 2022 15:22:22 +0530 Subject: [PATCH] dynamo data conversion update --- sources/dynamodb/schema.go | 7 ++++--- sources/dynamodb/streaming.go | 22 ++++++++++++++++------ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/sources/dynamodb/schema.go b/sources/dynamodb/schema.go index 62fc8c7aa..4d218d0db 100644 --- a/sources/dynamodb/schema.go +++ b/sources/dynamodb/schema.go @@ -200,10 +200,11 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte fmt.Println("Starting DynamoDB Streams initialization...") latestStreamArn := make(map[string]interface{}) - orderTableNames := ddl.OrderTables(conv.SpSchema) + orderTableIds := ddl.OrderTables(conv.SpSchema) - for _, spannerTable := range orderTableNames { - srcTable, _ := internal.GetSourceTable(conv, spannerTable) + for _, tableId := range orderTableIds { + // srcTable, _ := internal.GetSourceTable(conv, spannerTable) + srcTable := conv.SrcSchema[tableId].Name streamArn, err := NewDynamoDBStream(isi.DynamoClient, srcTable) if err != nil { conv.Unexpected(fmt.Sprintf("Couldn't initialize DynamoDB Stream for table %s: %s", srcTable, err)) diff --git a/sources/dynamodb/streaming.go b/sources/dynamodb/streaming.go index b0653d3ae..37d161c50 100644 --- a/sources/dynamodb/streaming.go +++ b/sources/dynamodb/streaming.go @@ -38,7 +38,6 @@ import ( "github.com/cloudspannerecosystem/harbourbridge/common/metrics" "github.com/cloudspannerecosystem/harbourbridge/internal" "github.com/cloudspannerecosystem/harbourbridge/schema" - "github.com/cloudspannerecosystem/harbourbridge/sources/common" ) const ( @@ -345,12 +344,23 @@ func ProcessRecord(conv *internal.Conv, streamInfo *StreamingInfo, record *dynam eventName := *record.EventName streamInfo.StatsAddRecord(srcTable, eventName) - srcSchema, spTable, spCols, spSchema, err := common.GetColsAndSchemas(conv, srcTable) - if err != nil { - streamInfo.Unexpected(fmt.Sprintf("Can't get cols and schemas for table %s: %v", srcTable, err)) - return + tableId, _ := internal.GetTableIdFromName(conv, srcTable) + srcSchema := conv.SrcSchema[tableId] + spSchema := conv.SpSchema[tableId] + spTable := spSchema.Name + spCols := []string{} + srcCols := []string{} + for _, colId := range spSchema.ColIds { + spCols = append(spCols, spSchema.ColDefs[colId].Name) + srcCols = append(srcCols, srcSchema.ColDefs[colId].Name) } + // srcSchema, spTable, spCols, spSchema, err := common.GetColsAndSchemas(conv, srcTable) + // if err1 != nil || { + // streamInfo.Unexpected(fmt.Sprintf("Can't get cols and schemas for table %s: %v", srcTable, err)) + // return + // } + var srcImage map[string]*dynamodb.AttributeValue if eventName == "REMOVE" { srcImage = record.Dynamodb.Keys @@ -363,7 +373,7 @@ func ProcessRecord(conv *internal.Conv, streamInfo *StreamingInfo, record *dynam writeRecord(streamInfo, srcTable, spTable, eventName, spCols, spVals, srcSchema) } else { streamInfo.StatsAddBadRecord(srcTable, eventName) - streamInfo.CollectBadRecord(eventName, srcTable, srcSchema.ColIds, srcStrVals) + streamInfo.CollectBadRecord(eventName, srcTable, srcCols, srcStrVals) } streamInfo.StatsAddRecordProcessed() }