diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java index 9dfba2f71..35aaa8256 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java @@ -274,6 +274,7 @@ public Set finishedSplits() { @Override public void close() throws Exception { + LOG.info("Close reader split {}, read num {}", splitId, totalRead); if (this.currentVCR != null) { this.currentVCR.close(); this.currentVCR = null; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java index 588e624c4..4bd47cd6b 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java @@ -31,10 +31,12 @@ public static LakeSoulArrowSource create( TableInfo tableInfo = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableName, tableNamespace); RowType tableRowType = ArrowUtils.fromArrowSchema(Schema.fromJSON(tableInfo.getTableSchema())); DBUtil.TablePartitionKeys tablePartitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions()); + boolean isBounded = conf.getBoolean("IS_BOUNDED", false); return new LakeSoulArrowSource( tableInfo, tableId, conf.toMap(), + isBounded, tableRowType, tablePartitionKeys.primaryKeys, tablePartitionKeys.rangeKeys @@ -56,8 +58,8 @@ public static LakeSoulArrowSource create( tableInfo, tableId, conf.toMap(), - tableRowType, isBounded, + tableRowType, tablePartitionKeys.primaryKeys, tablePartitionKeys.rangeKeys, remainingPartitions @@ -68,8 +70,8 @@ public static LakeSoulArrowSource create( TableInfo tableInfo, TableId tableId, Map optionParams, - RowType tableRowType, boolean isBounded, + RowType tableRowType, List pkColumns, List partitionColumns, List> remainingPartitions @@ -94,6 +96,7 @@ public static LakeSoulArrowSource create( TableInfo tableInfo, TableId tableId, Map optionParams, + boolean isBounded, RowType tableRowType, List pkColumns, List partitionColumns @@ -103,7 +106,7 @@ public static LakeSoulArrowSource create( tableRowType, tableRowType, tableRowType, - false, + isBounded, pkColumns, partitionColumns, optionParams,