Skip to content

Commit

Permalink
chore+fix: BED-5514 - CySQL Performance Work (#1180)
Browse files Browse the repository at this point in the history
  • Loading branch information
zinic authored Feb 28, 2025
1 parent 826edf1 commit d670670
Show file tree
Hide file tree
Showing 36 changed files with 1,310 additions and 2,832 deletions.
136 changes: 120 additions & 16 deletions cmd/api/src/api/tools/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,64 @@ func convertNeo4jProperties(properties *graph.Properties) error {
return nil
}

func migrateNodes(ctx context.Context, neoDB, pgDB graph.Database) (map[graph.ID]graph.ID, error) {
func migrateNodesToNeo4j(ctx context.Context, neoDB, pgDB graph.Database) (map[graph.ID]graph.ID, error) {
defer measure.ContextLogAndMeasure(ctx, slog.LevelInfo, "Migrating nodes from PostgreSQL to Neo4j")()

var (
nodeBuffer []*graph.Node
nodeIDMappings = map[graph.ID]graph.ID{}
)

if err := pgDB.ReadTransaction(ctx, func(tx graph.Transaction) error {
return tx.Nodes().Fetch(func(cursor graph.Cursor[*graph.Node]) error {
for next := range cursor.Chan() {
nodeBuffer = append(nodeBuffer, next)

if len(nodeBuffer) > 2000 {
if err := neoDB.WriteTransaction(ctx, func(tx graph.Transaction) error {
for _, nextNode := range nodeBuffer {
if newNode, err := tx.CreateNode(nextNode.Properties, nextNode.Kinds...); err != nil {
return err
} else {
nodeIDMappings[nextNode.ID] = newNode.ID
}
}

return nil
}); err != nil {
return err
}

nodeBuffer = nodeBuffer[:0]
}
}

if cursor.Error() == nil && len(nodeBuffer) > 0 {
if err := neoDB.WriteTransaction(ctx, func(tx graph.Transaction) error {
for _, nextNode := range nodeBuffer {
if newNode, err := tx.CreateNode(nextNode.Properties, nextNode.Kinds...); err != nil {
return err
} else {
nodeIDMappings[nextNode.ID] = newNode.ID
}
}

return nil
}); err != nil {
return err
}
}

return cursor.Error()
})
}); err != nil {
return nil, err
}

return nodeIDMappings, nil
}

func migrateNodesToPG(ctx context.Context, neoDB, pgDB graph.Database) (map[graph.ID]graph.ID, error) {
defer measure.ContextLogAndMeasure(ctx, slog.LevelInfo, "Migrating nodes from Neo4j to PostgreSQL")()

var (
Expand Down Expand Up @@ -149,27 +206,27 @@ func migrateNodes(ctx context.Context, neoDB, pgDB graph.Database) (map[graph.ID
return nodeIDMappings, pgDB.Run(ctx, fmt.Sprintf(`alter sequence node_id_seq restart with %d`, nextNodeID), nil)
}

func migrateEdges(ctx context.Context, neoDB, pgDB graph.Database, nodeIDMappings map[graph.ID]graph.ID) error {
func migrateEdges(ctx context.Context, sourceDB, destinationDB graph.Database, nodeIDMappings map[graph.ID]graph.ID) error {
defer measure.ContextLogAndMeasure(ctx, slog.LevelInfo, "Migrating edges from Neo4j to PostgreSQL")()

return neoDB.ReadTransaction(ctx, func(tx graph.Transaction) error {
return sourceDB.ReadTransaction(ctx, func(tx graph.Transaction) error {
return tx.Relationships().Fetch(func(cursor graph.Cursor[*graph.Relationship]) error {
if err := pgDB.BatchOperation(ctx, func(tx graph.Batch) error {
for next := range cursor.Chan() {
if err := destinationDB.BatchOperation(ctx, func(tx graph.Batch) error {
for nextSourceEdge := range cursor.Chan() {
var (
pgStartID = nodeIDMappings[next.StartID]
pgEndID = nodeIDMappings[next.EndID]
dstStartID = nodeIDMappings[nextSourceEdge.StartID]
dstEndID = nodeIDMappings[nextSourceEdge.EndID]
)

if err := convertNeo4jProperties(next.Properties); err != nil {
if err := convertNeo4jProperties(nextSourceEdge.Properties); err != nil {
return err
}

if err := tx.CreateRelationship(&graph.Relationship{
StartID: pgStartID,
EndID: pgEndID,
Kind: next.Kind,
Properties: next.Properties,
StartID: dstStartID,
EndID: dstEndID,
Kind: nextSourceEdge.Kind,
Properties: nextSourceEdge.Properties,
}); err != nil {
return err
}
Expand Down Expand Up @@ -263,7 +320,44 @@ func (s *PGMigrator) SwitchNeo4j(response http.ResponseWriter, request *http.Req
}
}

func (s *PGMigrator) StartMigration() error {
func (s *PGMigrator) StartMigrationToNeo() error {
if err := s.advanceState(StateMigrating, StateIdle); err != nil {
return fmt.Errorf("database migration state error: %w", err)
} else if neo4jDB, err := s.OpenNeo4jGraphConnection(); err != nil {
return fmt.Errorf("failed connecting to Neo4j: %w", err)
} else if pgDB, err := s.OpenPostgresGraphConnection(); err != nil {
return fmt.Errorf("failed connecting to PostgreSQL: %w", err)
} else {
slog.Info("Dispatching live migration from Neo4j to PostgreSQL")

migrationCtx, migrationCancelFunc := context.WithCancel(s.ServerCtx)
s.migrationCancelFunc = migrationCancelFunc

go func(ctx context.Context) {
defer migrationCancelFunc()

slog.InfoContext(ctx, "Starting live migration from Neo4j to PostgreSQL")

if err := neo4jDB.AssertSchema(ctx, s.graphSchema); err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("Unable to assert graph schema in PostgreSQL: %v", err))
} else if nodeIDMappings, err := migrateNodesToNeo4j(ctx, neo4jDB, pgDB); err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("Failed importing nodes into PostgreSQL: %v", err))
} else if err := migrateEdges(ctx, pgDB, neo4jDB, nodeIDMappings); err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("Failed importing edges into PostgreSQL: %v", err))
} else {
slog.InfoContext(ctx, "Migration to PostgreSQL completed successfully")
}

if err := s.advanceState(StateIdle, StateMigrating, StateCanceling); err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("Database migration state management error: %v", err))
}
}(migrationCtx)
}

return nil
}

func (s *PGMigrator) StartMigrationToPG() error {
if err := s.advanceState(StateMigrating, StateIdle); err != nil {
return fmt.Errorf("database migration state error: %w", err)
} else if neo4jDB, err := s.OpenNeo4jGraphConnection(); err != nil {
Expand All @@ -285,7 +379,7 @@ func (s *PGMigrator) StartMigration() error {
slog.ErrorContext(ctx, fmt.Sprintf("Unable to assert graph schema in PostgreSQL: %v", err))
} else if err := migrateTypes(ctx, neo4jDB, pgDB); err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("Unable to migrate Neo4j kinds to PostgreSQL: %v", err))
} else if nodeIDMappings, err := migrateNodes(ctx, neo4jDB, pgDB); err != nil {
} else if nodeIDMappings, err := migrateNodesToPG(ctx, neo4jDB, pgDB); err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("Failed importing nodes into PostgreSQL: %v", err))
} else if err := migrateEdges(ctx, neo4jDB, pgDB, nodeIDMappings); err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("Failed importing edges into PostgreSQL: %v", err))
Expand All @@ -302,8 +396,18 @@ func (s *PGMigrator) StartMigration() error {
return nil
}

func (s *PGMigrator) MigrationStart(response http.ResponseWriter, request *http.Request) {
if err := s.StartMigration(); err != nil {
func (s *PGMigrator) MigrationStartPGToNeo(response http.ResponseWriter, request *http.Request) {
if err := s.StartMigrationToNeo(); err != nil {
api.WriteJSONResponse(request.Context(), map[string]any{
"error": err.Error(),
}, http.StatusInternalServerError, response)
} else {
response.WriteHeader(http.StatusAccepted)
}
}

func (s *PGMigrator) MigrationStartNeoToPG(response http.ResponseWriter, request *http.Request) {
if err := s.StartMigrationToPG(); err != nil {
api.WriteJSONResponse(request.Context(), map[string]any{
"error": err.Error(),
}, http.StatusInternalServerError, response)
Expand Down
2 changes: 1 addition & 1 deletion cmd/api/src/api/tools/pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestPGMigrator(t *testing.T) {
})
require.Nil(t, err)

err = migrator.StartMigration()
err = migrator.StartMigrationToPG()
require.Nil(t, err)

// wait until migration status returns to "idle"
Expand Down
3 changes: 2 additions & 1 deletion cmd/api/src/daemons/api/toolapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func NewDaemon[DBType database.Database](ctx context.Context, connections bootst

router.Put("/graph-db/switch/pg", pgMigrator.SwitchPostgreSQL)
router.Put("/graph-db/switch/neo4j", pgMigrator.SwitchNeo4j)
router.Put("/pg-migration/start", pgMigrator.MigrationStart)
router.Put("/pg-migration/pg-to-neo", pgMigrator.MigrationStartPGToNeo)
router.Put("/pg-migration/neo-to-pg", pgMigrator.MigrationStartNeoToPG)
router.Get("/pg-migration/status", pgMigrator.MigrationStatus)
router.Put("/pg-migration/cancel", pgMigrator.MigrationCancel)

Expand Down
8 changes: 8 additions & 0 deletions packages/go/cypher/models/optional.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ type Optional[T any] struct {
Set bool
}

func (s Optional[T]) GetOr(defaultValue T) T {
if s.Set {
return s.Value
}

return defaultValue
}

func ValueOptional[T any](value T) Optional[T] {
return Optional[T]{
Value: value,
Expand Down
8 changes: 5 additions & 3 deletions packages/go/cypher/models/pgsql/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,10 +623,12 @@ func formatFromClauses(builder *OutputBuilder, fromClauses []pgsql.FromClause) e
return err
}

builder.Write(" on ")
if join.JoinOperator.Constraint != nil {
builder.Write(" on ")

if err := formatNode(builder, join.JoinOperator.Constraint); err != nil {
return err
if err := formatNode(builder, join.JoinOperator.Constraint); err != nil {
return err
}
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions packages/go/cypher/models/pgsql/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ func (s Query) NodeType() string {
return "query"
}

func BinaryExpressionJoin(optional Expression, operator Operator, conjoined Expression) Expression {
func OptionalBinaryExpressionJoin(optional Expression, operator Operator, conjoined Expression) Expression {
if optional == nil {
return conjoined
}
Expand All @@ -1192,6 +1192,12 @@ func BinaryExpressionJoin(optional Expression, operator Operator, conjoined Expr
)
}

func OptionalAnd(optional Expression, conjoined Expression) Expression {
return BinaryExpressionJoin(optional, OperatorAnd, conjoined)
func OptionalAnd(leftOperand Expression, rightOperand Expression) Expression {
if leftOperand == nil {
return rightOperand
} else if rightOperand == nil {
return leftOperand
}

return NewBinaryExpression(leftOperand, OperatorAnd, rightOperand)
}
1 change: 0 additions & 1 deletion packages/go/cypher/models/pgsql/pgtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ const (
ColumnKindID Identifier = "kind_id"
ColumnGraphID Identifier = "graph_id"
ColumnStartID Identifier = "start_id"
ColumnNextID Identifier = "next_id"
ColumnEndID Identifier = "end_id"
)

Expand Down
Loading

0 comments on commit d670670

Please sign in to comment.