Skip to content

Commit

Permalink
[Flink] Fix flink cdc task write file wrong bucket number bug (lakeso…
Browse files Browse the repository at this point in the history
…ul-io#584)

* fix_flink_write_file_bucket_bug

Signed-off-by: fphantam <[email protected]>

* fix invalid hash_bucket_num

Signed-off-by: fphantam <[email protected]>

* set lakesoul.sink.dynamic_bucketing default value is true

Signed-off-by: fphantam <[email protected]>

* fix PartitioningAsyncWriter with aux_sort_cols

Signed-off-by: fphantam <[email protected]>

---------

Signed-off-by: fphantam <[email protected]>
  • Loading branch information
F-PHantam committed Feb 26, 2025
1 parent 96e755b commit eefa7dd
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public static void main(String[] args) throws Exception {
conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath);
conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism);
conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism);
conf.set(LakeSoulSinkOptions.HASH_BUCKET_NUM, bucketParallelism);
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.getConfig().registerTypeWithKryoSerializer(BinarySourceRecord.class, BinarySourceRecordSerializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static void main(String[] args) throws Exception {
conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath);
conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism);
conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism);
conf.set(LakeSoulSinkOptions.HASH_BUCKET_NUM, bucketParallelism);
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static void main(String[] args) throws Exception {
conf.set(LakeSoulSinkOptions.isMultiTableSource, true);
conf.set(SOURCE_PARALLELISM, sourceParallelism);
conf.set(BUCKET_PARALLELISM, bucketParallelism);
conf.set(HASH_BUCKET_NUM, bucketParallelism);
conf.set(SERVER_TIME_ZONE, serverTimezone);
conf.set(WAREHOUSE_PATH, databasePrefixPath);
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
} else {
// for non-primary key table, hashBucketNum properties should not be set
if (tableOptions.containsKey(HASH_BUCKET_NUM.key())) {
if (tableOptions.containsKey(HASH_BUCKET_NUM.key()) && !tableOptions.get(HASH_BUCKET_NUM.key()).equals("-1")) {
throw new CatalogException("hashBucketNum property should not be set for table without primary key");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public DataStream<BinarySourceRecord> buildHashPartitionedCDCStream(DataStream<B
}

public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<BinarySourceRecord> stream) {
context.conf.set(DYNAMIC_BUCKETING, false);
Boolean dynamicBucketing = context.conf.get(DYNAMIC_BUCKETING);
if (!context.conf.contains(AUTO_SCHEMA_CHANGE)) {
context.conf.set(AUTO_SCHEMA_CHANGE, true);
}
Expand All @@ -71,8 +71,12 @@ public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<Binary
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig)
.build();
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink")
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
if (dynamicBucketing) {
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink");
} else {
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink")
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
}
}

public static DataStreamSink<LakeSoulArrowWrapper> buildArrowSink(Context context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ public DefaultOneTableBulkFormatBuilder(
super(basePath, conf, new DefaultLakeSoulWriterBucketFactory(conf));
this.identity = identity;
}
public TableSchemaIdentity getIdentity(){
public TableSchemaIdentity getIdentity() {
return this.identity;
}

@Override
public AbstractLakeSoulMultiTableSinkWriter<RowData, RowData> createWriter(Sink.InitContext context, int subTaskId) throws
IOException {
int hashBucketNum = conf.getInteger(LakeSoulSinkOptions.HASH_BUCKET_NUM);
int hashBucketId = hashBucketNum == -1 ? subTaskId : subTaskId % hashBucketNum;
int hashBucketId = hashBucketNum == -1 ? 0 : subTaskId % hashBucketNum;
System.out.printf("DefaultOneTableBulkFormatBuilder::createWriter, subTaskId=%d, hashBucketId=%d\n", subTaskId, hashBucketId);
return new LakeSoulRowDataOneTableSinkWriter(
hashBucketId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
identity.tableLocation, msgSchema, identity.useCDC, identity.cdcColumn, partition);
JSONObject properties = new JSONObject();
if (!identity.primaryKeys.isEmpty()) {
properties.put(HASH_BUCKET_NUM.key(), Integer.toString(conf.getInteger(BUCKET_PARALLELISM)));
properties.put(HASH_BUCKET_NUM.key(), Integer.toString(conf.getInteger(HASH_BUCKET_NUM)));
properties.put(HASH_PARTITIONS,
String.join(LAKESOUL_HASH_PARTITION_SPLITTER, identity.primaryKeys));
if (isCdc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,12 @@ public static String getDatabaseName(String fullDatabaseName) {
}

public static void setIOConfigs(Configuration conf, NativeIOBase io) {
conf.addAll(GlobalConfiguration.loadConfiguration());
Configuration globalConf = GlobalConfiguration.loadConfiguration();
globalConf.keySet().forEach(key -> {
if (!conf.containsKey(key)) {
conf.setString(key, globalConf.getString(key, null));
}
});
try {
FlinkUtil.class.getClassLoader().loadClass("org.apache.hadoop.hdfs.HdfsConfiguration");
org.apache.hadoop.conf.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class NativeOptions {
.withDescription("Option to set memory limit of native writer");

public static final ConfigOption<String> HASH_BUCKET_ID =
key("lakesoul.native_writer.hash_bucket_id")
key("hash_bucket_id")
.stringType()
.defaultValue("0")
.withDescription("Option to set hash bucket id of native writer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class SubstraitUtil {
try {
NATIVE_IO_BASE.close();
} catch (Exception e) {
throw new RuntimeException(e);
e.printStackTrace();
}
}));
}
Expand Down
25 changes: 19 additions & 6 deletions rust/lakesoul-io/src/async_writer/partitioning_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, sync::Arc};
use std::{borrow::Borrow, collections::HashMap, sync::Arc};

use arrow_array::RecordBatch;
use arrow_schema::{SchemaRef, SortOptions};
Expand All @@ -27,8 +27,8 @@ use crate::{
helpers::{
columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values,
},
lakesoul_io_config::{create_session_context, LakeSoulIOConfig, LakeSoulIOConfigBuilder},
repartition::RepartitionByRangeAndHashExec,
lakesoul_io_config::{create_session_context, IOSchema, LakeSoulIOConfig, LakeSoulIOConfigBuilder},
repartition::RepartitionByRangeAndHashExec, transform::uniform_schema,
};

use super::{AsyncBatchWriter, MultiPartAsyncWriter, ReceiverStreamExec, WriterFlushResult};
Expand Down Expand Up @@ -64,13 +64,26 @@ impl PartitioningAsyncWriter {
let write_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);

// let partitioned_file_path_and_row_count = Arc::new(Mutex::new(HashMap::<String, (Vec<String>, u64)>::new()));
let mut writer_config = config.clone();
if !config.aux_sort_cols.is_empty() {
let schema = config.target_schema.0.clone();
// O(nm), n = number of target schema fields, m = number of aux sort cols
let proj_indices = schema
.fields
.iter()
.filter(|f| !config.aux_sort_cols.contains(f.name()))
.map(|f| schema.index_of(f.name().as_str()).map_err(|e| DataFusionError::ArrowError(e, None)))
.collect::<Result<Vec<usize>>>()?;
let writer_schema = Arc::new(schema.project(proj_indices.borrow())?);
writer_config.target_schema = IOSchema(uniform_schema(writer_schema));
}

for i in 0..partitioning_exec.output_partitioning().partition_count() {
let sink_task = tokio::spawn(Self::pull_and_sink(
partitioning_exec.clone(),
i,
task_context.clone(),
config.clone().into(),
writer_config.clone().into(),
Arc::new(config.range_partitions.clone()),
write_id.clone(),
));
Expand Down Expand Up @@ -101,8 +114,8 @@ impl PartitioningAsyncWriter {
.chain(config.primary_keys.iter())
// add aux sort cols to sort expr
.chain(config.aux_sort_cols.iter())
.map(|pk| {
let col = Column::new_with_schema(pk.as_str(), &config.target_schema.0)?;
.map(|sort_column| {
let col = Column::new_with_schema(sort_column.as_str(), &config.target_schema.0)?;
Ok(PhysicalSortExpr {
expr: Arc::new(col),
options: SortOptions::default(),
Expand Down

0 comments on commit eefa7dd

Please sign in to comment.