-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Detect cascading actions in RDS source #5168
Conversation
Signed-off-by: Hai Yan <[email protected]>
|
||
private final String database; | ||
private final String table; | ||
private final long timestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple update on different rows can happen at same timestamp. How is this been handled ? Should we also have some representation of row (primary key) here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timestamp is the timestamp on the binlog events from parent table. So the handling is similar to stream events.
...urce/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ForeignKeyAction.java
Outdated
Show resolved
Hide resolved
private final String parentTableName; | ||
|
||
@JsonProperty("referenced_key_name") | ||
private final String referencedKeyName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you clarify the difference between this and line 30 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Foreign key is from the child table. Referenced key is from the parent table, it's the key that foreign key references to.
...ds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ParentTable.java
Outdated
Show resolved
Hide resolved
return columnsWithCascadingDelete; | ||
} | ||
|
||
final Map<String, List<ForeignKeyRelation>> columnsWithCascadingDelete = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above. Change this to this.columnsWithCascadingDelete = new HashMap<>()
// Find out for this row, which columns are changing | ||
LOG.debug("Checking for updated columns"); | ||
final Map<String, Object> updatedColumnsAndValues = IntStream.range(0, row.getKey().length) | ||
.filter(i -> !row.getKey()[i].equals(row.getValue()[i])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does row.getKey() and row.getValue() refer to ? Are these column names of changes columns ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In update event, key is an array of column values before the update, while value is an array of the column values after the update. So this filter removes columns that were not changed.
.../main/java/org/opensearch/dataprepper/plugins/source/rds/resync/CascadingActionDetector.java
Outdated
Show resolved
Hide resolved
try (final Connection connection = connectionManager.getConnection()) { | ||
final List<ForeignKeyRelation> foreignKeyRelations = new ArrayList<>(); | ||
DatabaseMetaData metaData = connection.getMetaData(); | ||
String[] tableTypes = new String[]{"TABLE"}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be moved to static constant
...source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/SchemaManager.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Hai Yan <[email protected]>
a22bde2
to
1ec0230
Compare
Description
When a foreign key with cascading action defined, the cascading changes in the child tables won't appear in binlog. The rds source plugin will try to detect cascading changes and generate pipeline events for them. This PR covers the first part to detect cascades and save the information in the coordination store (as resync partitions). Next PR (#5171) will cover the processing of those partitions.
Issues Resolved
Contributes to #4561
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.