From 0de43525fc02f170b54ccb97addf8d7b9e0ad649 Mon Sep 17 00:00:00 2001 From: renzhimin7 <1240388654@qq.com> Date: Mon, 25 Dec 2023 17:37:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A=E4=B9=89St?= =?UTF-8?q?arRocksGenericRowTransformer=20=E6=B8=85=E6=B4=97=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: renzhimin7 <1240388654@qq.com> --- .../flink/row/sink/StarRocksGenericRowTransformer.java | 4 ++++ .../flink/table/sink/StarRocksDynamicSinkFunction.java | 6 +++++- .../flink/table/sink/StarRocksDynamicSinkFunctionV2.java | 6 +++++- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java index 423fea0a..c5c97d54 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java @@ -50,6 +50,10 @@ public void setRuntimeContext(RuntimeContext ctx) {} public Object[] transform(T record, boolean supportUpsertDelete) { Object[] rowData = new Object[fieldNames.length + (supportUpsertDelete ? 1 : 0)]; consumer.accept(rowData, record); + if (rowData == null) { + return null; + } + if (supportUpsertDelete && (record instanceof RowData)) { // set `__op` column rowData[rowData.length - 1] = StarRocksSinkOP.parse(((RowData)record).getRowKind()).ordinal(); diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java index 201502f5..8056adcd 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java @@ -129,7 +129,11 @@ public synchronized void invoke(T value, Context context) throws Exception { return; } } - String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete())); + Object[] rowData = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); + if (rowData == null) { + return; + } + String serializedValue = serializer.serialize(rowData); sinkManager.writeRecords( sinkOptions.getDatabaseName(), sinkOptions.getTableName(), diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java index 63536236..9f02f53e 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java @@ -161,7 +161,11 @@ public void invoke(T value, Context context) throws Exception { } } flushLegacyData(); - String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete())); + Object[] rowData = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); + if (rowData == null) { + return; + } + String serializedValue = serializer.serialize(rowData); sinkManager.write( null, sinkOptions.getDatabaseName(),