diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java index 7b7a58322..cb21620bc 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java @@ -107,6 +107,7 @@ public static void main(String[] args) throws Exception { conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath); conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism); conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism); + conf.set(LakeSoulSinkOptions.HASH_BUCKET_NUM, bucketParallelism); conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.getConfig().registerTypeWithKryoSerializer(BinarySourceRecord.class, BinarySourceRecordSerializer.class); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MysqlCdc.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MysqlCdc.java index d3b54f295..f900d806b 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MysqlCdc.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MysqlCdc.java @@ -80,6 +80,7 @@ public static void main(String[] args) throws Exception { conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath); conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism); conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism); + conf.set(LakeSoulSinkOptions.HASH_BUCKET_NUM, bucketParallelism); conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/OracleCdc.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/OracleCdc.java index b7f2d000d..071bd69eb 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/OracleCdc.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/OracleCdc.java @@ -76,6 +76,7 @@ public static void main(String[] args) throws Exception { conf.set(LakeSoulSinkOptions.isMultiTableSource, true); conf.set(SOURCE_PARALLELISM, sourceParallelism); conf.set(BUCKET_PARALLELISM, bucketParallelism); + conf.set(HASH_BUCKET_NUM, bucketParallelism); conf.set(SERVER_TIME_ZONE, serverTimezone); conf.set(WAREHOUSE_PATH, databasePrefixPath); conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java index 0f0ef4613..8a0538b6f 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java @@ -307,7 +307,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } else { // for non-primary key table, hashBucketNum properties should not be set - if (tableOptions.containsKey(HASH_BUCKET_NUM.key())) { + if (tableOptions.containsKey(HASH_BUCKET_NUM.key()) && !tableOptions.get(HASH_BUCKET_NUM.key()).equals("-1")) { throw new CatalogException("hashBucketNum property should not be set for table without primary key"); } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTableSinkStreamBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTableSinkStreamBuilder.java index 4ea8d2607..d3aaf95c6 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTableSinkStreamBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTableSinkStreamBuilder.java @@ -55,7 +55,7 @@ public DataStream buildHashPartitionedCDCStream(DataStream buildLakeSoulDMLSink(DataStream stream) { - context.conf.set(DYNAMIC_BUCKETING, false); + Boolean dynamicBucketing = context.conf.get(DYNAMIC_BUCKETING); if (!context.conf.contains(AUTO_SCHEMA_CHANGE)) { context.conf.set(AUTO_SCHEMA_CHANGE, true); } @@ -71,8 +71,12 @@ public DataStreamSink buildLakeSoulDMLSink(DataStream buildArrowSink(Context context, diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java index 047ec1bb2..e9a6c2823 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java @@ -33,7 +33,7 @@ public DefaultOneTableBulkFormatBuilder( super(basePath, conf, new DefaultLakeSoulWriterBucketFactory(conf)); this.identity = identity; } - public TableSchemaIdentity getIdentity(){ + public TableSchemaIdentity getIdentity() { return this.identity; } @@ -41,7 +41,7 @@ public TableSchemaIdentity getIdentity(){ public AbstractLakeSoulMultiTableSinkWriter createWriter(Sink.InitContext context, int subTaskId) throws IOException { int hashBucketNum = conf.getInteger(LakeSoulSinkOptions.HASH_BUCKET_NUM); - int hashBucketId = hashBucketNum == -1 ? subTaskId : subTaskId % hashBucketNum; + int hashBucketId = hashBucketNum == -1 ? 0 : subTaskId % hashBucketNum; System.out.printf("DefaultOneTableBulkFormatBuilder::createWriter, subTaskId=%d, hashBucketId=%d\n", subTaskId, hashBucketId); return new LakeSoulRowDataOneTableSinkWriter( hashBucketId, diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java index 2ed8e1543..1d9f279b0 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java @@ -153,7 +153,7 @@ public List commit( identity.tableLocation, msgSchema, identity.useCDC, identity.cdcColumn, partition); JSONObject properties = new JSONObject(); if (!identity.primaryKeys.isEmpty()) { - properties.put(HASH_BUCKET_NUM.key(), Integer.toString(conf.getInteger(BUCKET_PARALLELISM))); + properties.put(HASH_BUCKET_NUM.key(), Integer.toString(conf.getInteger(HASH_BUCKET_NUM))); properties.put(HASH_PARTITIONS, String.join(LAKESOUL_HASH_PARTITION_SPLITTER, identity.primaryKeys)); if (isCdc) { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java index cdbdbc81e..ca32cffb4 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java @@ -385,7 +385,12 @@ public static String getDatabaseName(String fullDatabaseName) { } public static void setIOConfigs(Configuration conf, NativeIOBase io) { - conf.addAll(GlobalConfiguration.loadConfiguration()); + Configuration globalConf = GlobalConfiguration.loadConfiguration(); + globalConf.keySet().forEach(key -> { + if (!conf.containsKey(key)) { + conf.setString(key, globalConf.getString(key, null)); + } + }); try { FlinkUtil.class.getClassLoader().loadClass("org.apache.hadoop.hdfs.HdfsConfiguration"); org.apache.hadoop.conf.Configuration diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/NativeOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/NativeOptions.java index b16d4958f..1cda69451 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/NativeOptions.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/NativeOptions.java @@ -15,7 +15,7 @@ public class NativeOptions { .withDescription("Option to set memory limit of native writer"); public static final ConfigOption HASH_BUCKET_ID = - key("lakesoul.native_writer.hash_bucket_id") + key("hash_bucket_id") .stringType() .defaultValue("0") .withDescription("Option to set hash bucket id of native writer"); diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java index 7a30b369a..c503dfa4f 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java @@ -73,6 +73,7 @@ public void setAuxSortColumns(Iterable auxSortColumns) { } public void setHashBucketNum(Integer hashBucketNum) { + hashBucketNum = hashBucketNum < 1 ? 1 : hashBucketNum; ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_hash_bucket_num(ioConfigBuilder, hashBucketNum); } diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java index fd465287f..84bd2bde0 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java @@ -88,7 +88,7 @@ public class SubstraitUtil { try { NATIVE_IO_BASE.close(); } catch (Exception e) { - throw new RuntimeException(e); + e.printStackTrace(); } })); } diff --git a/rust/lakesoul-io/src/async_writer/partitioning_writer.rs b/rust/lakesoul-io/src/async_writer/partitioning_writer.rs index bddc3b8bf..a00fb466b 100644 --- a/rust/lakesoul-io/src/async_writer/partitioning_writer.rs +++ b/rust/lakesoul-io/src/async_writer/partitioning_writer.rs @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, sync::Arc}; +use std::{borrow::Borrow, collections::HashMap, sync::Arc}; use arrow_array::RecordBatch; use arrow_schema::{SchemaRef, SortOptions}; @@ -27,8 +27,8 @@ use crate::{ helpers::{ columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values, }, - lakesoul_io_config::{create_session_context, LakeSoulIOConfig, LakeSoulIOConfigBuilder}, - repartition::RepartitionByRangeAndHashExec, + lakesoul_io_config::{create_session_context, IOSchema, LakeSoulIOConfig, LakeSoulIOConfigBuilder}, + repartition::RepartitionByRangeAndHashExec, transform::uniform_schema, }; use super::{AsyncBatchWriter, MultiPartAsyncWriter, ReceiverStreamExec, WriterFlushResult}; @@ -64,13 +64,26 @@ impl PartitioningAsyncWriter { let write_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16); // let partitioned_file_path_and_row_count = Arc::new(Mutex::new(HashMap::, u64)>::new())); + let mut writer_config = config.clone(); + if !config.aux_sort_cols.is_empty() { + let schema = config.target_schema.0.clone(); + // O(nm), n = number of target schema fields, m = number of aux sort cols + let proj_indices = schema + .fields + .iter() + .filter(|f| !config.aux_sort_cols.contains(f.name())) + .map(|f| schema.index_of(f.name().as_str()).map_err(|e| DataFusionError::ArrowError(e, None))) + .collect::>>()?; + let writer_schema = Arc::new(schema.project(proj_indices.borrow())?); + writer_config.target_schema = IOSchema(uniform_schema(writer_schema)); + } for i in 0..partitioning_exec.output_partitioning().partition_count() { let sink_task = tokio::spawn(Self::pull_and_sink( partitioning_exec.clone(), i, task_context.clone(), - config.clone().into(), + writer_config.clone().into(), Arc::new(config.range_partitions.clone()), write_id.clone(), )); @@ -101,8 +114,8 @@ impl PartitioningAsyncWriter { .chain(config.primary_keys.iter()) // add aux sort cols to sort expr .chain(config.aux_sort_cols.iter()) - .map(|pk| { - let col = Column::new_with_schema(pk.as_str(), &config.target_schema.0)?; + .map(|sort_column| { + let col = Column::new_with_schema(sort_column.as_str(), &config.target_schema.0)?; Ok(PhysicalSortExpr { expr: Arc::new(col), options: SortOptions::default(),