Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink] Fix flink cdc task write file wrong bucket number bug #584

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public static void main(String[] args) throws Exception {
conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath);
conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism);
conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism);
conf.set(LakeSoulSinkOptions.HASH_BUCKET_NUM, bucketParallelism);
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.getConfig().registerTypeWithKryoSerializer(BinarySourceRecord.class, BinarySourceRecordSerializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static void main(String[] args) throws Exception {
conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath);
conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism);
conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism);
conf.set(LakeSoulSinkOptions.HASH_BUCKET_NUM, bucketParallelism);
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static void main(String[] args) throws Exception {
conf.set(LakeSoulSinkOptions.isMultiTableSource, true);
conf.set(SOURCE_PARALLELISM, sourceParallelism);
conf.set(BUCKET_PARALLELISM, bucketParallelism);
conf.set(HASH_BUCKET_NUM, bucketParallelism);
conf.set(SERVER_TIME_ZONE, serverTimezone);
conf.set(WAREHOUSE_PATH, databasePrefixPath);
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
} else {
// for non-primary key table, hashBucketNum properties should not be set
if (tableOptions.containsKey(HASH_BUCKET_NUM.key())) {
if (tableOptions.containsKey(HASH_BUCKET_NUM.key()) && !tableOptions.get(HASH_BUCKET_NUM.key()).equals("-1")) {
throw new CatalogException("hashBucketNum property should not be set for table without primary key");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public DataStream<BinarySourceRecord> buildHashPartitionedCDCStream(DataStream<B
}

public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<BinarySourceRecord> stream) {
context.conf.set(DYNAMIC_BUCKETING, false);
Boolean dynamicBucketing = context.conf.get(DYNAMIC_BUCKETING);
if (!context.conf.contains(AUTO_SCHEMA_CHANGE)) {
context.conf.set(AUTO_SCHEMA_CHANGE, true);
}
Expand All @@ -71,8 +71,12 @@ public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<Binary
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig)
.build();
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink")
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
if (dynamicBucketing) {
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink");
} else {
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink")
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
}
}

public static DataStreamSink<LakeSoulArrowWrapper> buildArrowSink(Context context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ public DefaultOneTableBulkFormatBuilder(
super(basePath, conf, new DefaultLakeSoulWriterBucketFactory(conf));
this.identity = identity;
}
public TableSchemaIdentity getIdentity(){
public TableSchemaIdentity getIdentity() {
return this.identity;
}

@Override
public AbstractLakeSoulMultiTableSinkWriter<RowData, RowData> createWriter(Sink.InitContext context, int subTaskId) throws
IOException {
int hashBucketNum = conf.getInteger(LakeSoulSinkOptions.HASH_BUCKET_NUM);
int hashBucketId = hashBucketNum == -1 ? subTaskId : subTaskId % hashBucketNum;
int hashBucketId = hashBucketNum == -1 ? 0 : subTaskId % hashBucketNum;
System.out.printf("DefaultOneTableBulkFormatBuilder::createWriter, subTaskId=%d, hashBucketId=%d\n", subTaskId, hashBucketId);
return new LakeSoulRowDataOneTableSinkWriter(
hashBucketId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
identity.tableLocation, msgSchema, identity.useCDC, identity.cdcColumn, partition);
JSONObject properties = new JSONObject();
if (!identity.primaryKeys.isEmpty()) {
properties.put(HASH_BUCKET_NUM.key(), Integer.toString(conf.getInteger(BUCKET_PARALLELISM)));
properties.put(HASH_BUCKET_NUM.key(), Integer.toString(conf.getInteger(HASH_BUCKET_NUM)));
properties.put(HASH_PARTITIONS,
String.join(LAKESOUL_HASH_PARTITION_SPLITTER, identity.primaryKeys));
if (isCdc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,12 @@ public static String getDatabaseName(String fullDatabaseName) {
}

public static void setIOConfigs(Configuration conf, NativeIOBase io) {
conf.addAll(GlobalConfiguration.loadConfiguration());
Configuration globalConf = GlobalConfiguration.loadConfiguration();
globalConf.keySet().forEach(key -> {
if (!conf.containsKey(key)) {
conf.setString(key, globalConf.getString(key, null));
}
});
try {
FlinkUtil.class.getClassLoader().loadClass("org.apache.hadoop.hdfs.HdfsConfiguration");
org.apache.hadoop.conf.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class NativeOptions {
.withDescription("Option to set memory limit of native writer");

public static final ConfigOption<String> HASH_BUCKET_ID =
key("lakesoul.native_writer.hash_bucket_id")
key("hash_bucket_id")
.stringType()
.defaultValue("0")
.withDescription("Option to set hash bucket id of native writer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void setAuxSortColumns(Iterable<String> auxSortColumns) {
}

public void setHashBucketNum(Integer hashBucketNum) {
hashBucketNum = hashBucketNum < 1 ? 1 : hashBucketNum;
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_hash_bucket_num(ioConfigBuilder, hashBucketNum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class SubstraitUtil {
try {
NATIVE_IO_BASE.close();
} catch (Exception e) {
throw new RuntimeException(e);
e.printStackTrace();
}
}));
}
Expand Down
25 changes: 19 additions & 6 deletions rust/lakesoul-io/src/async_writer/partitioning_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, sync::Arc};
use std::{borrow::Borrow, collections::HashMap, sync::Arc};

use arrow_array::RecordBatch;
use arrow_schema::{SchemaRef, SortOptions};
Expand All @@ -27,8 +27,8 @@ use crate::{
helpers::{
columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values,
},
lakesoul_io_config::{create_session_context, LakeSoulIOConfig, LakeSoulIOConfigBuilder},
repartition::RepartitionByRangeAndHashExec,
lakesoul_io_config::{create_session_context, IOSchema, LakeSoulIOConfig, LakeSoulIOConfigBuilder},
repartition::RepartitionByRangeAndHashExec, transform::uniform_schema,
};

use super::{AsyncBatchWriter, MultiPartAsyncWriter, ReceiverStreamExec, WriterFlushResult};
Expand Down Expand Up @@ -64,13 +64,26 @@ impl PartitioningAsyncWriter {
let write_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);

// let partitioned_file_path_and_row_count = Arc::new(Mutex::new(HashMap::<String, (Vec<String>, u64)>::new()));
let mut writer_config = config.clone();
if !config.aux_sort_cols.is_empty() {
let schema = config.target_schema.0.clone();
// O(nm), n = number of target schema fields, m = number of aux sort cols
let proj_indices = schema
.fields
.iter()
.filter(|f| !config.aux_sort_cols.contains(f.name()))
.map(|f| schema.index_of(f.name().as_str()).map_err(|e| DataFusionError::ArrowError(e, None)))
.collect::<Result<Vec<usize>>>()?;
let writer_schema = Arc::new(schema.project(proj_indices.borrow())?);
writer_config.target_schema = IOSchema(uniform_schema(writer_schema));
}

for i in 0..partitioning_exec.output_partitioning().partition_count() {
let sink_task = tokio::spawn(Self::pull_and_sink(
partitioning_exec.clone(),
i,
task_context.clone(),
config.clone().into(),
writer_config.clone().into(),
Arc::new(config.range_partitions.clone()),
write_id.clone(),
));
Expand Down Expand Up @@ -101,8 +114,8 @@ impl PartitioningAsyncWriter {
.chain(config.primary_keys.iter())
// add aux sort cols to sort expr
.chain(config.aux_sort_cols.iter())
.map(|pk| {
let col = Column::new_with_schema(pk.as_str(), &config.target_schema.0)?;
.map(|sort_column| {
let col = Column::new_with_schema(sort_column.as_str(), &config.target_schema.0)?;
Ok(PhysicalSortExpr {
expr: Arc::new(col),
options: SortOptions::default(),
Expand Down
Loading