diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java index 423fea0a..b89a353e 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java @@ -50,6 +50,9 @@ public void setRuntimeContext(RuntimeContext ctx) {} public Object[] transform(T record, boolean supportUpsertDelete) { Object[] rowData = new Object[fieldNames.length + (supportUpsertDelete ? 1 : 0)]; consumer.accept(rowData, record); + if (rowData == null) { + return null; + } if (supportUpsertDelete && (record instanceof RowData)) { // set `__op` column rowData[rowData.length - 1] = StarRocksSinkOP.parse(((RowData)record).getRowKind()).ordinal(); diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java index 201502f5..1e522b1e 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java @@ -129,7 +129,11 @@ public synchronized void invoke(T value, Context context) throws Exception { return; } } - String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete())); + Object[] rowData = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); + if (rowData == null) { + return; + } + String serializedValue = serializer.serialize(); sinkManager.writeRecords( sinkOptions.getDatabaseName(), sinkOptions.getTableName(), diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java index 63536236..f5f5a2df 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java @@ -161,7 +161,11 @@ public void invoke(T value, Context context) throws Exception { } } flushLegacyData(); - String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete())); + Object[] rowData = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); + if (rowData == null) { + return; + } + String serializedValue = serializer.serialize(); sinkManager.write( null, sinkOptions.getDatabaseName(),