From 1ea39912e3e6977f3a2a3a7614e78a4b8f7e9f98 Mon Sep 17 00:00:00 2001 From: "zhiminr.ren" Date: Mon, 25 Dec 2023 16:32:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A=E4=B9=89St?= =?UTF-8?q?arRocksGenericRowTransformer=20=E6=B8=85=E6=B4=97=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flink/row/sink/StarRocksGenericRowTransformer.java | 3 +++ .../flink/table/sink/StarRocksDynamicSinkFunction.java | 6 +++++- .../flink/table/sink/StarRocksDynamicSinkFunctionV2.java | 6 +++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java index 423fea0a..b89a353e 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java @@ -50,6 +50,9 @@ public void setRuntimeContext(RuntimeContext ctx) {} public Object[] transform(T record, boolean supportUpsertDelete) { Object[] rowData = new Object[fieldNames.length + (supportUpsertDelete ? 1 : 0)]; consumer.accept(rowData, record); + if (rowData == null) { + return null; + } if (supportUpsertDelete && (record instanceof RowData)) { // set `__op` column rowData[rowData.length - 1] = StarRocksSinkOP.parse(((RowData)record).getRowKind()).ordinal(); diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java index 201502f5..1e522b1e 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java @@ -129,7 +129,11 @@ public synchronized void invoke(T value, Context context) throws Exception { return; } } - String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete())); + Object[] rowData = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); + if (rowData == null) { + return; + } + String serializedValue = serializer.serialize(); sinkManager.writeRecords( sinkOptions.getDatabaseName(), sinkOptions.getTableName(), diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java index 63536236..f5f5a2df 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java @@ -161,7 +161,11 @@ public void invoke(T value, Context context) throws Exception { } } flushLegacyData(); - String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete())); + Object[] rowData = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); + if (rowData == null) { + return; + } + String serializedValue = serializer.serialize(); sinkManager.write( null, sinkOptions.getDatabaseName(),