From 92c6d23bd25b9de61e05859e18c5b267c81f80db Mon Sep 17 00:00:00 2001 From: yuxia Luo Date: Sun, 5 Jan 2025 23:55:13 +0800 Subject: [PATCH] [kv] Support first_row merge engine (#240) This close #133 --- .../fluss/client/table/FlussTableITCase.java | 55 ++++++++ .../alibaba/fluss/config/ConfigOptions.java | 7 + .../alibaba/fluss/metadata/MergeEngine.java | 39 ++++++ .../fluss/metadata/TableDescriptor.java | 9 ++ .../flink/catalog/FlinkTableFactory.java | 18 ++- .../connector/flink/sink/FlinkTableSink.java | 40 ++++-- .../flink/source/FlinkTableSource.java | 15 ++- .../flink/utils/FlinkConversions.java | 7 +- .../flink/sink/FlinkTableSinkITCase.java | 75 +++++++++++ .../testutils/TestingDatabaseSyncSink.java | 3 +- .../alibaba/fluss/server/kv/KvManager.java | 12 +- .../com/alibaba/fluss/server/kv/KvTablet.java | 28 +++- .../alibaba/fluss/server/replica/Replica.java | 7 +- .../fluss/server/kv/KvManagerTest.java | 2 +- .../alibaba/fluss/server/kv/KvTabletTest.java | 122 ++++++++++++++---- 15 files changed, 378 insertions(+), 61 deletions(-) create mode 100644 fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 3b8e2a494..ed2bfb943 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -21,6 +21,7 @@ import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; import com.alibaba.fluss.client.lookup.PrefixLookupResult; import com.alibaba.fluss.client.scanner.ScanRecord; +import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; import com.alibaba.fluss.client.scanner.log.ScanRecords; import com.alibaba.fluss.client.table.writer.AppendWriter; @@ -32,6 +33,7 @@ import com.alibaba.fluss.config.MemorySize; import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; @@ -876,4 +878,57 @@ void testInvalidColumnProjection() throws Exception { .hasMessage( "Projected field index 2 is out of bound for schema ROW<`a` INT, `b` STRING>"); } + + @Test + void testFirstRowMergeEngine() throws Exception { + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA_PK) + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.FIRST_ROW) + .build(); + RowType rowType = DATA1_SCHEMA_PK.toRowType(); + createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false); + int rows = 5; + int duplicateNum = 3; + try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) { + // first, put rows + UpsertWriter upsertWriter = table.getUpsertWriter(); + List expectedRows = new ArrayList<>(rows); + for (int id = 0; id < rows; id++) { + for (int num = 0; num < duplicateNum; num++) { + upsertWriter.upsert(compactedRow(rowType, new Object[] {id, "value_" + num})); + } + expectedRows.add(compactedRow(rowType, new Object[] {id, "value_0"})); + } + upsertWriter.flush(); + + // now, get rows by lookup + for (int id = 0; id < rows; id++) { + InternalRow gotRow = + table.lookup(keyRow(DATA1_SCHEMA_PK, new Object[] {id, "dumpy"})) + .get() + .getRow(); + assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id)); + } + + // check scan change log + LogScanner logScanner = table.getLogScanner(new LogScan()); + logScanner.subscribeFromBeginning(0); + + List actualLogRecords = new ArrayList<>(0); + while (actualLogRecords.size() < rows) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + scanRecords.forEach(actualLogRecords::add); + } + + assertThat(actualLogRecords).hasSize(rows); + for (int i = 0; i < actualLogRecords.size(); i++) { + ScanRecord scanRecord = actualLogRecords.get(i); + assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT); + assertThatRow(scanRecord.getRow()) + .withSchema(rowType) + .isEqualTo(expectedRows.get(i)); + } + } + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 2c2893cdb..888049d2d 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.utils.ArrayUtils; import java.time.Duration; @@ -968,6 +969,12 @@ public class ConfigOptions { + "When this option is set to ture and the datalake tiering service is up," + " the table will be tiered and compacted into datalake format stored on lakehouse storage."); + public static final ConfigOption TABLE_MERGE_ENGINE = + key("table.merge-engine") + .enumType(MergeEngine.class) + .noDefaultValue() + .withDescription("The merge engine for the primary key table."); + // ------------------------------------------------------------------------ // ConfigOptions for Kv // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java new file mode 100644 index 000000000..fe1cfdb15 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metadata; + +/** + * The merge engine for primary key table. + * + * @since 0.6 + */ +public enum MergeEngine { + FIRST_ROW("first_row"); + + private final String value; + + MergeEngine(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 7c1df3db8..9a346ce7a 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -134,6 +134,11 @@ && getLogFormat() != LogFormat.ARROW) { throw new IllegalArgumentException( "For Primary Key Table, if kv format is compacted, log format must be arrow."); } + + if (!hasPrimaryKey() && getMergeEngine() != null) { + throw new IllegalArgumentException( + "Merge-engine is only supported in primary key table."); + } } /** Creates a builder for building table descriptor. */ @@ -275,6 +280,10 @@ public boolean isDataLakeEnabled() { return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED); } + public @Nullable MergeEngine getMergeEngine() { + return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE); + } + public TableDescriptor copy(Map newProperties) { return new TableDescriptor( schema, comment, partitionKeys, tableDistribution, newProperties, customProperties); diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index b719918bb..7f08a4765 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -52,7 +52,7 @@ import java.util.Set; import static com.alibaba.fluss.connector.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER; -import static org.apache.flink.configuration.ConfigOptions.key; +import static com.alibaba.fluss.connector.flink.utils.FlinkConversions.toFlinkOption; /** Factory to create table source and table sink for Fluss. */ public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { @@ -114,15 +114,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) .toMillis(); - boolean isDatalakeEnabled = - tableOptions.get( - key(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) - .booleanType() - .defaultValue(false)); return new FlinkTableSource( toFlussTablePath(context.getObjectIdentifier()), - toFlussClientConfig(helper.getOptions(), context.getConfiguration()), + toFlussClientConfig(tableOptions, context.getConfiguration()), tableOutputType, primaryKeyIndexes, bucketKeyIndexes, @@ -133,7 +128,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC), cache, partitionDiscoveryIntervalMs, - isDatalakeEnabled); + tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), + tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE))); } @Override @@ -146,13 +142,15 @@ public DynamicTableSink createDynamicTableSink(Context context) { == RuntimeExecutionMode.STREAMING; RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType(); + final ReadableConfig tableOptions = helper.getOptions(); return new FlinkTableSink( toFlussTablePath(context.getObjectIdentifier()), - toFlussClientConfig(helper.getOptions(), context.getConfiguration()), + toFlussClientConfig(tableOptions, context.getConfiguration()), rowType, context.getPrimaryKeyIndexes(), - isStreamingMode); + isStreamingMode, + tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE))); } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java index 5699aef90..d9a9b96db 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.connector.flink.utils.PushdownUtils; import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual; import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.GenericRow; @@ -63,6 +64,7 @@ public class FlinkTableSink private final RowType tableRowType; private final int[] primaryKeyIndexes; private final boolean streaming; + @Nullable private final MergeEngine mergeEngine; private boolean appliedUpdates = false; @Nullable private GenericRow deleteRow; @@ -72,12 +74,14 @@ public FlinkTableSink( Configuration flussConfig, RowType tableRowType, int[] primaryKeyIndexes, - boolean streaming) { + boolean streaming, + @Nullable MergeEngine mergeEngine) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; this.primaryKeyIndexes = primaryKeyIndexes; this.streaming = streaming; + this.mergeEngine = mergeEngine; } @Override @@ -112,12 +116,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // is 0, when no column specified, it's not partial update // see FLINK-36000 && context.getTargetColumns().get().length != 0) { - // check partial update - if (primaryKeyIndexes.length == 0 - && context.getTargetColumns().get().length != tableRowType.getFieldCount()) { - throw new ValidationException( - "Fluss table sink does not support partial updates for table without primary key. Please make sure the " - + "number of specified columns in INSERT INTO matches columns of the Fluss table."); + + // is partial update, check whether partial update is supported or not + if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) { + if (primaryKeyIndexes.length == 0) { + throw new ValidationException( + "Fluss table sink does not support partial updates for table without primary key. Please make sure the " + + "number of specified columns in INSERT INTO matches columns of the Fluss table."); + } else if (mergeEngine == MergeEngine.FIRST_ROW) { + throw new ValidationException( + String.format( + "Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the " + + "number of specified columns in INSERT INTO matches columns of the Fluss table.", + tablePath, MergeEngine.FIRST_ROW)); + } } int[][] targetColumns = context.getTargetColumns().get(); targetColumnIndexes = new int[targetColumns.length]; @@ -165,7 +177,12 @@ private List columns(int[] columnIndexes) { public DynamicTableSink copy() { FlinkTableSink sink = new FlinkTableSink( - tablePath, flussConfig, tableRowType, primaryKeyIndexes, streaming); + tablePath, + flussConfig, + tableRowType, + primaryKeyIndexes, + streaming, + mergeEngine); sink.appliedUpdates = appliedUpdates; sink.deleteRow = deleteRow; return sink; @@ -281,6 +298,13 @@ private void validateUpdatableAndDeletable() { "Table %s is a Log Table. Log Table doesn't support DELETE and UPDATE statements.", tablePath)); } + + if (mergeEngine == MergeEngine.FIRST_ROW) { + throw new UnsupportedOperationException( + String.format( + "Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.", + tablePath, MergeEngine.FIRST_ROW)); + } } private Map getPrimaryKeyTypes() { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java index 939d872c7..77484e15f 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java @@ -26,6 +26,7 @@ import com.alibaba.fluss.connector.flink.utils.FlinkConversions; import com.alibaba.fluss.connector.flink.utils.PushdownUtils; import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.types.RowType; @@ -104,6 +105,7 @@ public class FlinkTableSource private final long scanPartitionDiscoveryIntervalMs; private final boolean isDataLakeEnabled; + @Nullable private final MergeEngine mergeEngine; // output type after projection pushdown private LogicalType producedDataType; @@ -134,7 +136,8 @@ public FlinkTableSource( boolean lookupAsync, @Nullable LookupCache cache, long scanPartitionDiscoveryIntervalMs, - boolean isDataLakeEnabled) { + boolean isDataLakeEnabled, + @Nullable MergeEngine mergeEngine) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableOutputType = tableOutputType; @@ -151,6 +154,7 @@ public FlinkTableSource( this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.isDataLakeEnabled = isDataLakeEnabled; + this.mergeEngine = mergeEngine; } @Override @@ -160,7 +164,11 @@ public ChangelogMode getChangelogMode() { } else { if (hasPrimaryKey()) { // pk table - return ChangelogMode.all(); + if (mergeEngine == MergeEngine.FIRST_ROW) { + return ChangelogMode.insertOnly(); + } else { + return ChangelogMode.all(); + } } else { // append only return ChangelogMode.insertOnly(); @@ -341,7 +349,8 @@ public DynamicTableSource copy() { lookupAsync, cache, scanPartitionDiscoveryIntervalMs, - isDataLakeEnabled); + isDataLakeEnabled, + mergeEngine); source.producedDataType = producedDataType; source.projectedFields = projectedFields; source.singleRowFilter = singleRowFilter; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java index 7eb267069..57eb41e73 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java @@ -251,8 +251,9 @@ public static List> toFlinkOption } /** Convert Fluss's ConfigOption to Flink's ConfigOption. */ - public static org.apache.flink.configuration.ConfigOption toFlinkOption( - ConfigOption flussOption) { + @SuppressWarnings("unchecked") + public static org.apache.flink.configuration.ConfigOption toFlinkOption( + ConfigOption flussOption) { org.apache.flink.configuration.ConfigOptions.OptionBuilder builder = org.apache.flink.configuration.ConfigOptions.key(flussOption.key()); org.apache.flink.configuration.ConfigOption option; @@ -301,7 +302,7 @@ public static org.apache.flink.configuration.ConfigOption toFlinkOption( } option.withDescription(flussOption.description()); // TODO: support fallback keys in the future. - return option; + return (org.apache.flink.configuration.ConfigOption) option; } private static Map convertFlinkOptionsToFlussTableProperties( diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index 4c9f1f1a1..eda67565a 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -33,9 +33,11 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.types.Row; @@ -356,6 +358,38 @@ void testPartialUpsert() throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + @Test + void testFirstRowMergeEngine() throws Exception { + tEnv.executeSql( + "create table first_row_source (a int not null primary key not enforced," + + " b string) with('table.merge-engine' = 'first_row')"); + tEnv.executeSql("create table log_sink (a int, b string)"); + + // insert the primary table with first_row merge engine into the a log table to verify that + // the first_row merge engine only generates append-only stream + JobClient insertJobClient = + tEnv.executeSql("insert into log_sink select * from first_row_source") + .getJobClient() + .get(); + + // insert once + tEnv.executeSql( + "insert into first_row_source(a, b) VALUES (1, 'v1'), (2, 'v2'), (1, 'v11'), (3, 'v3')") + .await(); + + CloseableIterator rowIter = tEnv.executeSql("select * from log_sink").collect(); + + List expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3, v3]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + // insert again + tEnv.executeSql("insert into first_row_source(a, b) VALUES (3, 'v33'), (4, 'v44')").await(); + expectedRows = Collections.singletonList("+I[4, v44]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + insertJobClient.cancel().get(); + } + @Test void testInsertWithoutSpecifiedCols() { tEnv.executeSql("create table sink_insert_all (a int, b bigint, c string)"); @@ -712,6 +746,47 @@ void testUnsupportedDeleteAndUpdateStmtOnPartialPK() { "Currently, Fluss table only supports UPDATE statement with conditions on primary key."); } + @Test + void testUnsupportedStmtOnFirstRowMergeEngine() { + String t1 = "firstRowMergeEngineTable"; + TablePath tablePath = TablePath.of(DEFAULT_DB, t1); + tBatchEnv.executeSql( + String.format( + "create table %s (" + + " a int not null," + + " b bigint null, " + + " c string null, " + + " primary key (a) not enforced" + + ") with ('table.merge-engine' = 'first_row')", + t1)); + assertThatThrownBy(() -> tBatchEnv.executeSql("DELETE FROM " + t1 + " WHERE a = 1").await()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Table %s uses the 'first_row' merge engine which does not support DELETE or UPDATE statements.", + tablePath); + + assertThatThrownBy( + () -> + tBatchEnv + .executeSql("UPDATE " + t1 + " SET b = 4004 WHERE a = 1") + .await()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Table %s uses the 'first_row' merge engine which does not support DELETE or UPDATE statements.", + tablePath); + + assertThatThrownBy( + () -> + tBatchEnv + .executeSql("INSERT INTO " + t1 + "(a, c) VALUES(1, 'c1')") + .await()) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Table %s uses the 'first_row' merge engine which does not support partial updates." + + " Please make sure the number of specified columns in INSERT INTO matches columns of the Fluss table.", + tablePath); + } + private InsertAndExpectValues rowsToInsertInto(Collection partitions) { List insertValues = new ArrayList<>(); List expectedValues = new ArrayList<>(); diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java index 3a9927fe3..805180d22 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java @@ -87,7 +87,8 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro flussConfig, FlinkConversions.toFlinkRowType(rowType), tableDescriptor.getSchema().getPrimaryKeyIndexes(), - true); + true, + null); sinkFunction = ((SinkFunctionProvider) diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java index 87f69bc67..1bb600e35 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.memory.LazyMemorySegmentPool; import com.alibaba.fluss.memory.MemorySegmentPool; import com.alibaba.fluss.metadata.KvFormat; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; @@ -41,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.io.File; @@ -141,12 +143,14 @@ public void shutdown() { * @param tableBucket the table bucket * @param logTablet the cdc log tablet of the kv tablet * @param kvFormat the kv format + * @param mergeEngine the merge engine */ public KvTablet getOrCreateKv( PhysicalTablePath tablePath, TableBucket tableBucket, LogTablet logTablet, - KvFormat kvFormat) + KvFormat kvFormat, + @Nullable MergeEngine mergeEngine) throws Exception { return inLock( tabletCreationOrDeletionLock, @@ -164,7 +168,8 @@ public KvTablet getOrCreateKv( conf, arrowBufferAllocator, memorySegmentPool, - kvFormat); + kvFormat, + mergeEngine); currentKvs.put(tableBucket, tablet); LOG.info( @@ -265,7 +270,8 @@ public KvTablet loadKv(File tabletDir) throws Exception { conf, arrowBufferAllocator, memorySegmentPool, - tableDescriptor.getKvFormat()); + tableDescriptor.getKvFormat(), + tableDescriptor.getMergeEngine()); if (this.currentKvs.containsKey(tableBucket)) { throw new IllegalStateException( String.format( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index dfb5d85a1..abab7d7a8 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.memory.MemorySegmentPool; import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; @@ -102,6 +103,7 @@ public final class KvTablet { private final ReadWriteLock kvLock = new ReentrantReadWriteLock(); private final LogFormat logFormat; private final KvFormat kvFormat; + private final @Nullable MergeEngine mergeEngine; /** * The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been @@ -122,7 +124,8 @@ private KvTablet( LogFormat logFormat, BufferAllocator arrowBufferAllocator, MemorySegmentPool memorySegmentPool, - KvFormat kvFormat) { + KvFormat kvFormat, + @Nullable MergeEngine mergeEngine) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logTablet = logTablet; @@ -136,6 +139,7 @@ private KvTablet( // TODO: [FLUSS-58674883] share cache in server level when PartialUpdater is thread-safe this.partialUpdaterCache = new PartialUpdaterCache(); this.kvFormat = kvFormat; + this.mergeEngine = mergeEngine; } public static KvTablet create( @@ -144,7 +148,8 @@ public static KvTablet create( Configuration conf, BufferAllocator arrowBufferAllocator, MemorySegmentPool memorySegmentPool, - KvFormat kvFormat) + KvFormat kvFormat, + @Nullable MergeEngine mergeEngine) throws IOException { Tuple2 tablePathAndBucket = FlussPaths.parseTabletDir(kvTabletDir); @@ -156,7 +161,8 @@ public static KvTablet create( conf, arrowBufferAllocator, memorySegmentPool, - kvFormat); + kvFormat, + mergeEngine); } public static KvTablet create( @@ -167,7 +173,8 @@ public static KvTablet create( Configuration conf, BufferAllocator arrowBufferAllocator, MemorySegmentPool memorySegmentPool, - KvFormat kvFormat) + KvFormat kvFormat, + @Nullable MergeEngine mergeEngine) throws IOException { RocksDBKv kv = buildRocksDBKv(conf, kvTabletDir); return new KvTablet( @@ -180,7 +187,8 @@ public static KvTablet create( logTablet.getLogFormat(), arrowBufferAllocator, memorySegmentPool, - kvFormat); + kvFormat, + mergeEngine); } private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir) @@ -265,7 +273,7 @@ public LogAppendInfo putAsLeader( byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); if (kvRecord.getRow() == null) { - // kv tablet + // it's for deletion byte[] oldValue = getFromBufferOrKv(key); if (oldValue == null) { // there might be large amount of such deletion, so we don't log @@ -273,6 +281,10 @@ public LogAppendInfo putAsLeader( "The specific key can't be found in kv tablet although the kv record is for deletion, " + "ignore it directly as it doesn't exist in the kv tablet yet."); } else { + if (mergeEngine == MergeEngine.FIRST_ROW) { + // if the merge engine is first row, skip the deletion + continue; + } BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; BinaryRow newRow = deleteRow(oldRow, partialUpdater); // if newRow is null, it means the row should be deleted @@ -297,6 +309,10 @@ public LogAppendInfo putAsLeader( byte[] oldValue = getFromBufferOrKv(key); // it's update if (oldValue != null) { + if (mergeEngine == MergeEngine.FIRST_ROW) { + // if the merge engine is first row, skip the update + continue; + } BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; BinaryRow newRow = updateRow(oldRow, kvRecord.getRow(), partialUpdater); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 6d3c80e72..069d9b1f5 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -30,6 +30,7 @@ import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; @@ -167,6 +168,7 @@ public final class Replica { private final Schema schema; private final LogFormat logFormat; private final KvFormat kvFormat; + private final @Nullable MergeEngine mergeEngine; private final long logTTLMs; private final boolean dataLakeEnabled; private final int tieredLogLocalSegments; @@ -232,6 +234,7 @@ public Replica( this.logTTLMs = tableDescriptor.getLogTTLMs(); this.dataLakeEnabled = tableDescriptor.isDataLakeEnabled(); this.tieredLogLocalSegments = tableDescriptor.getTieredLogLocalSegments(); + this.mergeEngine = tableDescriptor.getMergeEngine(); this.partitionKeys = tableDescriptor.getPartitionKeys(); this.snapshotContext = snapshotContext; // create a closeable registry for the replica @@ -602,7 +605,9 @@ private Optional initKvTablet() { LOG.info("No snapshot found, restore from log."); // actually, kv manager always create a kv tablet since we will drop the kv // if it exists before init kv tablet - kvTablet = kvManager.getOrCreateKv(physicalPath, tableBucket, logTablet, kvFormat); + kvTablet = + kvManager.getOrCreateKv( + physicalPath, tableBucket, logTablet, kvFormat, mergeEngine); } logTablet.updateMinRetainOffset(restoreStartOffset); recoverKvTablet(restoreStartOffset); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java index 6bbf76326..3a266fd61 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java @@ -260,7 +260,7 @@ private KvTablet getOrCreateKv( LogTablet logTablet = logManager.getOrCreateLog(physicalTablePath, tableBucket, LogFormat.ARROW, 1, true); return kvManager.getOrCreateKv( - physicalTablePath, tableBucket, logTablet, KvFormat.COMPACTED); + physicalTablePath, tableBucket, logTablet, KvFormat.COMPACTED, null); } private byte[] valueOf(KvRecord kvRecord) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 89064e590..89674aaa3 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -22,6 +22,7 @@ import com.alibaba.fluss.memory.TestingMemorySegmentPool; import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; @@ -99,32 +100,9 @@ class KvTabletTest { @BeforeEach void beforeEach() throws Exception { PhysicalTablePath tablePath = PhysicalTablePath.of(TablePath.of("testDb", "t1")); - long tableId = 0L; - File logTabletDir = - LogTestUtils.makeRandomLogTabletDir( - tempLogDir, tablePath.getDatabaseName(), tableId, tablePath.getTableName()); - logTablet = - LogTablet.create( - tablePath, - logTabletDir, - conf, - 0, - new FlussScheduler(1), - LogFormat.ARROW, - 1, - true, - SystemClock.getInstance()); + logTablet = createLogTablet(tempLogDir, 0L, tablePath); TableBucket tableBucket = logTablet.getTableBucket(); - kvTablet = - KvTablet.create( - tablePath, - tableBucket, - logTablet, - tmpKvDir, - conf, - new RootAllocator(Long.MAX_VALUE), - new TestingMemorySegmentPool(10 * 1024), - KvFormat.COMPACTED); + kvTablet = createKvTablet(tablePath, tableBucket, logTablet, tmpKvDir, null); executor = Executors.newFixedThreadPool(2); } @@ -135,6 +113,42 @@ void afterEach() { } } + private LogTablet createLogTablet(File tempLogDir, long tableId, PhysicalTablePath tablePath) + throws Exception { + File logTabletDir = + LogTestUtils.makeRandomLogTabletDir( + tempLogDir, tablePath.getDatabaseName(), tableId, tablePath.getTableName()); + return LogTablet.create( + tablePath, + logTabletDir, + conf, + 0, + new FlussScheduler(1), + LogFormat.ARROW, + 1, + true, + SystemClock.getInstance()); + } + + private KvTablet createKvTablet( + PhysicalTablePath tablePath, + TableBucket tableBucket, + LogTablet logTablet, + File tmpKvDir, + MergeEngine mergeEngine) + throws Exception { + return KvTablet.create( + tablePath, + tableBucket, + logTablet, + tmpKvDir, + conf, + new RootAllocator(Long.MAX_VALUE), + new TestingMemorySegmentPool(10 * 1024), + KvFormat.COMPACTED, + mergeEngine); + } + @Test void testInvalidPartialUpdate() throws Exception { final Schema schema1 = DATA2_SCHEMA; @@ -536,11 +550,69 @@ void testPutAsLeaderWithOutOfOrderSequenceException() throws Exception { assertThat(kvTablet.getKvPreWriteBuffer().getMaxLSN()).isEqualTo(3); } + @Test + void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) + throws Exception { + PhysicalTablePath tablePath = + PhysicalTablePath.of(TablePath.of("testDb", "test_first_row")); + + LogTablet logTablet = createLogTablet(tempLogDir, 1L, tablePath); + TableBucket tableBucket = logTablet.getTableBucket(); + KvTablet kvTablet = + createKvTablet(tablePath, tableBucket, logTablet, tmpKvDir, MergeEngine.FIRST_ROW); + + List kvData1 = + Arrays.asList( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, "v11"}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v21"}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v23"})); + KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1); + kvTablet.putAsLeader(kvRecordBatch1, null, DATA1_SCHEMA_PK); + + long endOffset = logTablet.localLogEndOffset(); + LogRecords actualLogRecords = readLogRecords(logTablet); + List expectedLogs = + Collections.singletonList( + logRecords( + DATA1_SCHEMA_PK.toRowType(), + 0, + Arrays.asList(RowKind.INSERT, RowKind.INSERT), + Arrays.asList(new Object[] {1, "v11"}, new Object[] {2, "v21"}))); + checkEqual(actualLogRecords, expectedLogs); + + List kvData2 = + Arrays.asList( + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v22"}), + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, "v21"}), + kvRecordFactory.ofRecord("k1".getBytes(), null), + kvRecordFactory.ofRecord("k3".getBytes(), new Object[] {3, "v31"})); + KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2); + kvTablet.putAsLeader(kvRecordBatch2, null, DATA1_SCHEMA_PK); + + expectedLogs = + Collections.singletonList( + logRecords( + DATA1_SCHEMA_PK.toRowType(), + endOffset, + Collections.singletonList(RowKind.INSERT), + Collections.singletonList(new Object[] {3, "v31"}))); + actualLogRecords = readLogRecords(logTablet, endOffset); + checkEqual(actualLogRecords, expectedLogs); + } + private LogRecords readLogRecords() throws Exception { return readLogRecords(0L); } private LogRecords readLogRecords(long startOffset) throws Exception { + return readLogRecords(logTablet, startOffset); + } + + private LogRecords readLogRecords(LogTablet logTablet) throws Exception { + return readLogRecords(logTablet, 0L); + } + + private LogRecords readLogRecords(LogTablet logTablet, long startOffset) throws Exception { return logTablet .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END, false, null) .getRecords();