Skip to content

Commit

Permalink
[kv] Introduce internal RowMerger to unify primary key table behavior (
Browse files Browse the repository at this point in the history
…#277)

This also refines the behavior of versioned merge engine: use the latest row if version is the same, and use MIN_VALUE as the default value if the given version is null.
  • Loading branch information
wuchong committed Jan 31, 2025
1 parent ad20683 commit ca181e6
Show file tree
Hide file tree
Showing 26 changed files with 969 additions and 626 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,53 @@ void testCreateTableWithInvalidProperty() {
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessage("'table.log.tiered.local-segments' must be greater than 0.");

TableDescriptor t4 =
TableDescriptor.builder()
.schema(DEFAULT_SCHEMA) // no pk
.comment("test table")
.property(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned")
.build();
// should throw exception
assertThatThrownBy(() -> admin.createTable(tablePath, t4, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessage(
"'%s' must be set for versioned merge engine.",
ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key());

TableDescriptor t5 =
TableDescriptor.builder()
.schema(DEFAULT_SCHEMA) // no pk
.comment("test table")
.property(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned")
.property(
ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(),
"non-existed")
.build();
// should throw exception
assertThatThrownBy(() -> admin.createTable(tablePath, t5, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessage(
"Failed to create versioned merge engine: The version column 'non-existed' "
+ "for versioned merge engine doesn't exist in schema.");

TableDescriptor t6 =
TableDescriptor.builder()
.schema(DEFAULT_SCHEMA) // no pk
.comment("test table")
.property(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned")
.property(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(), "name")
.build();
// should throw exception
assertThatThrownBy(() -> admin.createTable(tablePath, t6, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessage(
"Failed to create versioned merge engine: The version column 'name' "
+ "for versioned merge engine must be one type of "
+ "[INT, BIGINT, TIMESTAMP, TIMESTAMP_LTZ], but is STRING.");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.MergeEngineType;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand All @@ -53,7 +53,6 @@
import com.alibaba.fluss.utils.Preconditions;

import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
Expand Down Expand Up @@ -113,7 +112,6 @@ void testAppendOnly() throws Exception {

@ParameterizedTest
@ValueSource(booleans = {true, false})
@Disabled("TODO, fix me in #116")
void testAppendWithSmallBuffer(boolean indexedFormat) throws Exception {
TableDescriptor desc =
indexedFormat
Expand Down Expand Up @@ -931,7 +929,7 @@ void testFirstRowMergeEngine() throws Exception {
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA1_SCHEMA_PK)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.FIRST_ROW)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.FIRST_ROW)
.build();
RowType rowType = DATA1_SCHEMA_PK.toRowType();
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false);
Expand Down Expand Up @@ -1065,7 +1063,7 @@ void testMergeEngineWithVersion() throws Exception {
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA3_SCHEMA_PK)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.VERSION)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.VERSIONED)
.property(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN, "b")
.build();
RowType rowType = DATA3_SCHEMA_PK.toRowType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.alibaba.fluss.compression.ArrowCompressionType;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.MergeEngineType;
import com.alibaba.fluss.utils.ArrayUtils;

import java.time.Duration;
Expand Down Expand Up @@ -1004,17 +1004,24 @@ public class ConfigOptions {
+ "When this option is set to ture and the datalake tiering service is up,"
+ " the table will be tiered and compacted into datalake format stored on lakehouse storage.");

public static final ConfigOption<MergeEngine.Type> TABLE_MERGE_ENGINE =
public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
key("table.merge-engine")
.enumType(MergeEngine.Type.class)
.enumType(MergeEngineType.class)
.noDefaultValue()
.withDescription("The merge engine for the primary key table.");
.withDescription(
"Defines the merge engine for the primary key table. By default, primary key table doesn't have merge engine. "
+ "The supported merge engines are 'first_row' and 'versioned'. "
+ "The 'first_row' merge engine will keep the first row of the same primary key. "
+ "The 'versioned' merge engine will keep the row with the largest version of the same primary key.");

public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
key("table.merge-engine.version.column")
// we may need to introduce "del-column" in the future to support delete operation
key("table.merge-engine.versioned.ver-column")
.stringType()
.noDefaultValue()
.withDescription("The merge engine version column for the primary key table.");
.withDescription(
"The column name of the version column for the 'versioned' merge engine. "
+ "If the merge engine is set to 'versioned', the version column must be set.");

// ------------------------------------------------------------------------
// ConfigOptions for Kv
Expand Down
110 changes: 0 additions & 110 deletions fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.metadata;

/**
* The merge engine for the primary key table.
*
* <p>A primary key table with a merge engine is a special kind of table, called "merge table".
* Fluss provides 3 kinds of table: "primary key table", "log table", and "merge table". Merge table
* is a primary key table that has a primary key definition but doesn't directly UPDATE and DELETE
* rows in the table, and instead, it merges the append rows into a new data set according to the
* defined {@link MergeEngineType}. Therefore, it doesn't support direct UPDATE (also
* partial-update) and DELETE operations and only supports INSERT or APPEND operations.
*
* <p>Note: A primary key table doesn't have a merge engine by default.
*
* @since 0.6
*/
public enum MergeEngineType {

/**
* A merge engine that only keeps the first appeared row when merging multiple rows on the same
* primary key.
*/
FIRST_ROW,

/**
* A merge engine that keeps the row with the largest version when merging multiple rows on the
* same primary key. It requires users to specify a version column (e.g., an event timestamp).
* When inserting a row, it will compare the version column value with the existing row with the
* same primary key.
*
* <ul>
* <li>If the new version is larger to or the same with the old version , then it will replace
* the existing row with the new row.
* <li>If the new version is smaller to the old version, then it will ignore the new row.
* <li>Null version value is treated as the smallest version (i.e., Long.MIN_VALUE)
* </ul>
*/
VERSIONED;

// introduce AGGREGATE merge engine in the future

/** Creates a {@link MergeEngineType} from the given string. */
public static MergeEngineType fromString(String type) {
switch (type.toUpperCase()) {
case "FIRST_ROW":
return FIRST_ROW;
case "VERSIONED":
return VERSIONED;
default:
throw new IllegalArgumentException("Unsupported merge engine type: " + type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ && getLogFormat() != LogFormat.ARROW) {
"For Primary Key Table, if kv format is compacted, log format must be arrow.");
}

if (!hasPrimaryKey() && getMergeEngine() != null) {
if (!hasPrimaryKey() && getMergeEngineType() != null) {
throw new IllegalArgumentException(
"Merge-engine is only supported in primary key table.");
"Merge engine is only supported in primary key table.");
}

// TODO: generalize the validation for ConfigOption
Expand Down Expand Up @@ -293,8 +293,8 @@ public boolean isDataLakeEnabled() {
return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED);
}

public @Nullable MergeEngine getMergeEngine() {
return MergeEngine.create(properties);
public @Nullable MergeEngineType getMergeEngineType() {
return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE);
}

/** Gets the Arrow compression type and compression level of the table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
import com.alibaba.fluss.connector.flink.source.FlinkTableSource;
import com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.TablePath;

import org.apache.flink.api.common.RuntimeExecutionMode;
Expand Down Expand Up @@ -130,7 +129,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
cache,
partitionDiscoveryIntervalMs,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
MergeEngine.create(helper.getOptions().toMap()));
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
}

@Override
Expand All @@ -151,7 +150,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode,
MergeEngine.create(helper.getOptions().toMap()),
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE));
}

Expand Down
Loading

0 comments on commit ca181e6

Please sign in to comment.