Skip to content

Commit

Permalink
支持自定义StarRocksGenericRowTransformer 清洗数据
Browse files Browse the repository at this point in the history
Signed-off-by: renzhimin7 <[email protected]>
  • Loading branch information
renzhimin7 committed Dec 25, 2023
1 parent 2320c40 commit 0de4352
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public void setRuntimeContext(RuntimeContext ctx) {}
public Object[] transform(T record, boolean supportUpsertDelete) {
Object[] rowData = new Object[fieldNames.length + (supportUpsertDelete ? 1 : 0)];
consumer.accept(rowData, record);
if (rowData == null) {
return null;
}

if (supportUpsertDelete && (record instanceof RowData)) {
// set `__op` column
rowData[rowData.length - 1] = StarRocksSinkOP.parse(((RowData)record).getRowKind()).ordinal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ public synchronized void invoke(T value, Context context) throws Exception {
return;
}
}
String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete()));
Object[] rowData = rowTransformer.transform(value, sinkOptions.supportUpsertDelete());
if (rowData == null) {
return;
}
String serializedValue = serializer.serialize(rowData);
sinkManager.writeRecords(
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ public void invoke(T value, Context context) throws Exception {
}
}
flushLegacyData();
String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete()));
Object[] rowData = rowTransformer.transform(value, sinkOptions.supportUpsertDelete());
if (rowData == null) {
return;
}
String serializedValue = serializer.serialize(rowData);
sinkManager.write(
null,
sinkOptions.getDatabaseName(),
Expand Down

0 comments on commit 0de4352

Please sign in to comment.