From 3f60146eed7d62afa84319bcb06c5415aff17151 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Tue, 3 Dec 2024 14:10:54 +0800 Subject: [PATCH] [connector] Fluss Connector support batch source. --- .../scanner/snapshot/HybridFilesReader.java | 253 ++++++++++++++++++ .../client/scanner/snapshot/HybridScan.java | 60 +++++ .../scanner/snapshot/HybridScanner.java | 65 +++++ .../scanner/snapshot/SnapshotFilesReader.java | 39 +-- .../scanner/snapshot/SnapshotScanner.java | 99 ++++--- .../fluss/client/table/FlussTable.java | 29 ++ .../com/alibaba/fluss/client/table/Table.java | 10 + .../scanner/snapshot/HybridScannerITCase.java | 195 ++++++++++++++ .../snapshot/SnapshotScannerITCase.java | 14 +- .../connector/flink/source/FlinkSource.java | 3 +- .../flink/source/FlinkTableSource.java | 4 - .../enumerator/FlinkSourceEnumerator.java | 104 ++++--- .../NoStoppingOffsetsInitializer.java | 7 +- .../reader/FlinkRecordsWithSplitIds.java | 2 +- .../source/reader/FlinkSourceReader.java | 6 +- .../source/reader/FlinkSourceSplitReader.java | 70 +++-- .../source/reader/SnapshotSplitScanner.java | 30 ++- .../source/split/HybridSnapshotLogSplit.java | 61 ++++- .../split/HybridSnapshotLogSplitState.java | 5 +- .../flink/source/split/LogSplitState.java | 8 +- .../source/split/SourceSplitSerializer.java | 21 +- .../source/FlinkTableSourceBatchITCase.java | 143 ++++++++-- .../enumerator/FlinkSourceEnumeratorTest.java | 58 ++-- .../source/reader/FlinkSourceReaderTest.java | 117 +++++++- .../reader/FlinkSourceSplitReaderTest.java | 68 +++-- .../split/SourceSplitSerializerTest.java | 2 +- .../flink/source/testutils/FlinkTestBase.java | 17 +- 27 files changed, 1263 insertions(+), 227 deletions(-) create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridFilesReader.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridScan.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridScanner.java create mode 100644 fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/HybridScannerITCase.java diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridFilesReader.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridFilesReader.java new file mode 100644 index 000000000..1dd6c942c --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridFilesReader.java @@ -0,0 +1,253 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.scanner.snapshot; + +import com.alibaba.fluss.client.scanner.ScanRecord; +import com.alibaba.fluss.client.scanner.log.LogScanner; +import com.alibaba.fluss.client.scanner.log.ScanRecords; +import com.alibaba.fluss.metadata.KvFormat; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.row.BinaryRow; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.row.compacted.CompactedRow; +import com.alibaba.fluss.row.encode.KeyEncoder; +import com.alibaba.fluss.row.encode.RowEncoder; +import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.IOUtils; + +import org.rocksdb.RocksDBException; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * A reader to read kv snapshot files to {@link ScanRecord}s and merge a piece of log. It will + * return the {@link ScanRecord}s as an iterator. + */ +@NotThreadSafe +public class HybridFilesReader extends SnapshotFilesReader { + private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L); + + private final short schemaId; + private final KeyEncoder keyEncoder; + private final RowEncoder rowEncoder; + private final KvFormat kvFormat; + private final DataType[] fieldTypes; + private final long startingOffset; + private final long stoppingOffset; + private final TableBucket tableBucket; + private final LogScanner logScanner; + + // key is index of a projected field in the whole table, value is index of the projected field + // in whole project fields + private final Map logProjectedFieldSet; + + InternalRow.FieldGetter[] logFieldGetters; + + public HybridFilesReader( + KvFormat kvFormat, + Path rocksDbPath, + short schemaId, + Schema tableSchema, + @Nullable int[] targetProjectedFields, + @Nullable int[] logProjectedFields, + long startingOffset, + long stoppingOffset, + TableBucket tableBucket, + LogScanner logScanner) { + super(kvFormat, rocksDbPath, tableSchema, targetProjectedFields); + this.schemaId = schemaId; + this.keyEncoder = + new KeyEncoder(tableSchema.toRowType(), tableSchema.getPrimaryKeyIndexes()); + DataType[] dataTypes = tableSchema.toRowType().getChildren().toArray(new DataType[0]); + this.rowEncoder = RowEncoder.create(kvFormat, dataTypes); + this.kvFormat = kvFormat; + this.fieldTypes = dataTypes; + this.startingOffset = startingOffset; + this.stoppingOffset = stoppingOffset; + this.tableBucket = tableBucket; + this.logScanner = logScanner; + + if (logProjectedFields == null) { + logProjectedFields = IntStream.range(0, dataTypes.length).toArray(); + } + + this.logProjectedFieldSet = new HashMap<>(); + for (int i = 0; i < logProjectedFields.length; i++) { + logProjectedFieldSet.put(logProjectedFields[i], i); + } + + logFieldGetters = new InternalRow.FieldGetter[logProjectedFields.length]; + for (int i = 0; i < logProjectedFields.length; i++) { + DataType fieldDataType = dataTypes[logProjectedFields[i]]; + logFieldGetters[i] = InternalRow.createFieldGetter(fieldDataType, i); + } + } + + /** Override init to subscribe and apply logs before init iterator. */ + @Override + public void init() throws IOException { + try { + initRocksDB(rocksDbPath, false); + subscribeAndApplyLogs(); + initRocksIterator(); + } catch (Throwable t) { + releaseSnapshot(); + // If anything goes wrong, clean up our stuff. If things went smoothly the + // merging iterator is now responsible for closing the resources + IOUtils.closeQuietly(closeableRegistry); + throw new IOException("Error creating RocksDB snapshot reader.", t); + } + } + + private void subscribeAndApplyLogs() throws RocksDBException { + if (startingOffset >= stoppingOffset || stoppingOffset == 0) { + return; + } + + if (tableBucket.getPartitionId() != null) { + logScanner.subscribe( + tableBucket.getPartitionId(), tableBucket.getBucket(), startingOffset); + } else { + logScanner.subscribe(tableBucket.getBucket(), startingOffset); + } + + boolean readEnd = false; + do { + ScanRecords scanRecords = logScanner.poll(POLL_TIMEOUT); + for (ScanRecord scanRecord : scanRecords) { + // apply log to snapshot. + if (scanRecord.getOffset() <= stoppingOffset - 1) { + applyLogs(scanRecord); + } + if (scanRecord.getOffset() >= stoppingOffset - 1) { + readEnd = true; + break; + } + } + + } while (!readEnd); + } + + private void applyLogs(ScanRecord scanRecord) throws RocksDBException { + BinaryRow row = castProjectRowToEntireRow(scanRecord.getRow()); + byte[] key = keyEncoder.encode(row); + switch (scanRecord.getRowKind()) { + case APPEND_ONLY: + throw new UnsupportedOperationException( + "Hybrid File Reader can not apply append only logs."); + case INSERT: + case UPDATE_AFTER: + byte[] value = ValueEncoder.encodeValue(schemaId, row); + db.put(key, value); + break; + case DELETE: + case UPDATE_BEFORE: + db.delete(key); + } + } + + /** + * The row of log is projection result while the row of snapshot is entire row, thus need to put + * a placeholder value at un-projection indexes. + */ + private BinaryRow castProjectRowToEntireRow(InternalRow row) { + if (KvFormat.COMPACTED.equals(kvFormat)) { + return castToAnEntireCompactedRow(row); + } else { + return castToAnEntireIndexedRow(row); + } + } + + private BinaryRow castToAnEntireIndexedRow(InternalRow row) { + rowEncoder.startNewRow(); + for (Integer projectField : logProjectedFieldSet.keySet()) { + rowEncoder.encodeField( + projectField, + logFieldGetters[logProjectedFieldSet.get(projectField)].getFieldOrNull(row)); + } + return rowEncoder.finishRow(); + } + + private BinaryRow castToAnEntireCompactedRow(InternalRow row) { + if (row instanceof CompactedRow) { + return (CompactedRow) row; + } + rowEncoder.startNewRow(); + for (int i = 0; i < fieldTypes.length; i++) { + if (logProjectedFieldSet.containsKey(i)) { + rowEncoder.encodeField( + i, logFieldGetters[logProjectedFieldSet.get(i)].getFieldOrNull(row)); + } else { + // When use ProjectedRow to read projection columns from compacted row in rocksdb, + // deserialize the entire row at first. Thus, must put into placeholder value though + // it is no use later, nor un-projection columns maybe out of bound. + rowEncoder.encodeField(i, getPlaceHolderValueOfCompactedRow(fieldTypes[i])); + } + } + return rowEncoder.finishRow(); + } + + private static Object getPlaceHolderValueOfCompactedRow(DataType fieldType) { + if (fieldType.isNullable()) { + return null; + } + + switch (fieldType.getTypeRoot()) { + case CHAR: + case STRING: + return BinaryString.blankString(1); + case BOOLEAN: + return false; + case BINARY: + case BYTES: + return new byte[0]; + case DECIMAL: + return BigDecimal.ZERO; + case TINYINT: + case SMALLINT: + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case BIGINT: + case FLOAT: + case DOUBLE: + return 0; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return TimestampNtz.now(); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return TimestampLtz.fromInstant(Instant.MIN); + default: + throw new IllegalArgumentException( + "Unsupported type for CompactedRow: " + fieldType); + } + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridScan.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridScan.java new file mode 100644 index 000000000..8b9bcc046 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridScan.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.scanner.snapshot; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.fs.FsPathAndFileName; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableBucket; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * A class to describe hybrid scan for a single bucket, which include the snapshot files and a piece + * of log. + * + *

It also contains the starting offset and ending offset comparing {@link SnapshotScan} + * + * @since 0.3 + */ +@PublicEvolving +public class HybridScan extends SnapshotScan { + private final long logStartingOffset; + private final long logStoppingOffset; + + public HybridScan( + TableBucket tableBucket, + List fsPathAndFileNames, + Schema tableSchema, + @Nullable int[] projectedFields, + long logStartingOffset, + long logStoppingOffset) { + super(tableBucket, fsPathAndFileNames, tableSchema, projectedFields); + this.logStartingOffset = logStartingOffset; + this.logStoppingOffset = logStoppingOffset; + } + + public long getLogStartingOffset() { + return logStartingOffset; + } + + public long getLogStoppingOffset() { + return logStoppingOffset; + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridScanner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridScanner.java new file mode 100644 index 000000000..1de670729 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/HybridScanner.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.scanner.snapshot; + +import com.alibaba.fluss.client.scanner.RemoteFileDownloader; +import com.alibaba.fluss.client.scanner.log.LogScanner; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.metadata.KvFormat; + +import javax.annotation.Nullable; + +/** A hybrid scanner to merge snapshot and bounded log. */ +public class HybridScanner extends SnapshotScanner { + + private final byte schemaId; + private final LogScanner logScanner; + private final int[] logProjectedFields; + + public HybridScanner( + Configuration conf, + KvFormat kvFormat, + RemoteFileDownloader remoteFileDownloader, + byte schemaId, + HybridScan hybridScan, + @Nullable int[] logProjectedFields, + LogScanner logScanner) { + super(conf, kvFormat, remoteFileDownloader, hybridScan); + this.logScanner = logScanner; + this.schemaId = schemaId; + this.logProjectedFields = logProjectedFields; + } + + /** + * Override initReader to provide {@link HybridFilesReader} and subscribe and apply a piece of + * logs. + */ + @Override + protected SnapshotFilesReader createSnapshotReader() { + return new HybridFilesReader( + kvFormat, + snapshotLocalDirectory, + schemaId, + snapshotScan.getTableSchema(), + snapshotScan.getProjectedFields(), + logProjectedFields, + ((HybridScan) snapshotScan).getLogStartingOffset(), + ((HybridScan) snapshotScan).getLogStoppingOffset(), + snapshotScan.getTableBucket(), + logScanner); + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotFilesReader.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotFilesReader.java index b87f37f14..b64e49c8b 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotFilesReader.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotFilesReader.java @@ -52,29 +52,33 @@ class SnapshotFilesReader implements Iterator, AutoCloseable { private final ValueDecoder valueDecoder; @Nullable private final int[] projectedFields; - private RocksIteratorWrapper rocksIteratorWrapper; - - private Snapshot snapshot; - private RocksDBHandle rocksDBHandle; + protected final Path rocksDbPath; + protected RocksIteratorWrapper rocksIteratorWrapper; + protected Snapshot snapshot; + protected RocksDBHandle rocksDBHandle; + protected RocksDB db; private boolean isClose = false; - private final CloseableRegistry closeableRegistry; + protected final CloseableRegistry closeableRegistry; SnapshotFilesReader( KvFormat kvFormat, Path rocksDbPath, Schema tableSchema, - @Nullable int[] projectedFields) - throws IOException { + @Nullable int[] projectedFields) { this.valueDecoder = new ValueDecoder( RowDecoder.create( kvFormat, tableSchema.toRowType().getChildren().toArray(new DataType[0]))); + this.rocksDbPath = rocksDbPath; this.projectedFields = projectedFields; closeableRegistry = new CloseableRegistry(); + } + + public void init() throws IOException { try { - initRocksDB(rocksDbPath); + initRocksDB(rocksDbPath, true); initRocksIterator(); } catch (Throwable t) { releaseSnapshot(); @@ -85,23 +89,26 @@ class SnapshotFilesReader implements Iterator, AutoCloseable { } } - private void initRocksDB(Path rocksDbPath) throws Exception { + protected void initRocksDB(Path rocksDbPath, boolean isReadonly) throws Exception { // create rocksdb DBOptions dbOptions = new DBOptions(); closeableRegistry.registerCloseable(dbOptions::close); ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions(); closeableRegistry.registerCloseable(columnFamilyOptions::close); - + if (!isReadonly) { + // use writable rocksdb to merge log. + dbOptions.setCreateIfMissing(true); + } rocksDBHandle = - new RocksDBHandle(rocksDbPath.toFile(), dbOptions, columnFamilyOptions, true); + new RocksDBHandle(rocksDbPath.toFile(), dbOptions, columnFamilyOptions, isReadonly); closeableRegistry.registerCloseable(rocksDBHandle::close); - } - - private void initRocksIterator() throws IOException { // open a db rocksDBHandle.openDB(); // get the snapshot - RocksDB db = rocksDBHandle.getDb(); + db = rocksDBHandle.getDb(); + } + + protected void initRocksIterator() throws IOException { snapshot = db.getSnapshot(); closeableRegistry.registerCloseable(snapshot::close); @@ -129,7 +136,7 @@ public void close() throws IOException { isClose = true; } - private void releaseSnapshot() { + protected void releaseSnapshot() { if (snapshot != null && rocksDBHandle != null) { rocksDBHandle.getDb().releaseSnapshot(snapshot); snapshot = null; diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScanner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScanner.java index dbc9b608b..454df8f1e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScanner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScanner.java @@ -71,10 +71,10 @@ public class SnapshotScanner implements AutoCloseable { public static final CloseableIterator NO_DATA_AVAILABLE = CloseableIterator.emptyIterator(); - private final Path snapshotLocalDirectory; - private final RemoteFileDownloader remoteFileDownloader; - private final KvFormat kvFormat; - private final SnapshotScan snapshotScan; + protected final Path snapshotLocalDirectory; + protected final RemoteFileDownloader remoteFileDownloader; + protected final KvFormat kvFormat; + protected final SnapshotScan snapshotScan; private final ReentrantLock lock = new ReentrantLock(); @@ -83,9 +83,12 @@ public class SnapshotScanner implements AutoCloseable { private final AtomicBoolean closed; + // When a new reader is created, it's a pendingReader, and will become useful + // snapshotFilesReader after init. + private volatile SnapshotFilesReader pendingReader; private volatile SnapshotFilesReader snapshotFilesReader; - @Nullable private volatile Throwable initSnapshotFilesReaderException = null; + @Nullable protected volatile Throwable initSnapshotFilesReaderException = null; public SnapshotScanner( Configuration conf, @@ -164,6 +167,9 @@ public boolean reachedEnd() { * initialization */ private void ensureNoException() { + if (closed.get()) { + throw new IllegalStateException("This scanner has already been closed."); + } if (initSnapshotFilesReaderException != null) { throw new FlussRuntimeException( "Failed to initialize snapshot files reader.", @@ -181,45 +187,54 @@ public void close() { private void initReaderAsynchronously() { CompletableFuture.runAsync( - () -> + () -> { + CloseableRegistry closeableRegistry = new CloseableRegistry(); + try { + if (!snapshotLocalDirectory.toFile().mkdirs()) { + throw new IOException( + String.format( + "Failed to create directory %s for storing kv snapshot files.", + snapshotLocalDirectory)); + } + + // todo: refactor transferAllToDirectory method to + // return a future so that we won't need to runAsync using + // the default thread pool + LOG.info( + "Start to download kv snapshot files to local directory for bucket {}.", + snapshotScan.getTableBucket()); + remoteFileDownloader.transferAllToDirectory( + snapshotScan.getFsPathAndFileNames(), + snapshotLocalDirectory, + closeableRegistry); inLock( lock, () -> { - CloseableRegistry closeableRegistry = new CloseableRegistry(); - try { - if (!snapshotLocalDirectory.toFile().mkdirs()) { - throw new IOException( - String.format( - "Failed to create directory %s for storing kv snapshot files.", - snapshotLocalDirectory)); - } - closeableRegistry.registerCloseable( - () -> - FileUtils.deleteDirectoryQuietly( - snapshotLocalDirectory.toFile())); - // todo: refactor transferAllToDirectory method to - // return a future so that we won't need to runAsync using - // the default thread pool - LOG.info( - "Start to download kv snapshot files to local directory for bucket {}.", - snapshotScan.getTableBucket()); - remoteFileDownloader.transferAllToDirectory( - snapshotScan.getFsPathAndFileNames(), - snapshotLocalDirectory, - closeableRegistry); - snapshotFilesReader = - new SnapshotFilesReader( - kvFormat, - snapshotLocalDirectory, - snapshotScan.getTableSchema(), - snapshotScan.getProjectedFields()); - readerIsReady.signalAll(); - } catch (Throwable e) { - IOUtils.closeQuietly(closeableRegistry); - initSnapshotFilesReaderException = e; - } finally { - IOUtils.closeQuietly(closeableRegistry); - } - })); + pendingReader = createSnapshotReader(); + }); + pendingReader.init(); + inLock( + lock, + () -> { + snapshotFilesReader = pendingReader; + pendingReader = null; + readerIsReady.signalAll(); + }); + + } catch (Throwable e) { + IOUtils.closeQuietly(closeableRegistry); + initSnapshotFilesReaderException = e; + } finally { + IOUtils.closeQuietly(closeableRegistry); + } + }); + } + + protected SnapshotFilesReader createSnapshotReader() { + return new SnapshotFilesReader( + kvFormat, + snapshotLocalDirectory, + snapshotScan.getTableSchema(), + snapshotScan.getProjectedFields()); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index 9815eeb46..b5a68d3ab 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -25,6 +25,8 @@ import com.alibaba.fluss.client.scanner.log.FlussLogScanner; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; +import com.alibaba.fluss.client.scanner.snapshot.HybridScan; +import com.alibaba.fluss.client.scanner.snapshot.HybridScanner; import com.alibaba.fluss.client.scanner.snapshot.SnapshotScan; import com.alibaba.fluss.client.scanner.snapshot.SnapshotScanner; import com.alibaba.fluss.client.table.getter.PartitionGetter; @@ -83,6 +85,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.IntStream; import static com.alibaba.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode; @@ -380,6 +383,32 @@ public SnapshotScanner getSnapshotScanner(SnapshotScan snapshotScan) { snapshotScan); } + @Override + public HybridScanner getHybridScanner(HybridScan hybridScan) { + mayPrepareSecurityTokeResource(); + mayPrepareRemoteFileDownloader(); + int[] logProjectedFields = null; + if (hybridScan.getProjectedFields() != null) { + int[] primaryKeyIndexes = + tableInfo.getTableDescriptor().getSchema().getPrimaryKeyIndexes(); + logProjectedFields = + IntStream.concat( + IntStream.of(hybridScan.getProjectedFields()), + IntStream.of(primaryKeyIndexes)) + .distinct() + .toArray(); + } + + return new HybridScanner( + conf, + tableInfo.getTableDescriptor().getKvFormat(), + remoteFileDownloader, + (byte) tableInfo.getSchemaId(), + hybridScan, + logProjectedFields, + getLogScanner(new LogScan().withProjectedFields(logProjectedFields))); + } + @Override public void close() throws Exception { if (closed.compareAndSet(false, true)) { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java index b2373f7b3..3302d6d0e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java @@ -21,6 +21,8 @@ import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; +import com.alibaba.fluss.client.scanner.snapshot.HybridScan; +import com.alibaba.fluss.client.scanner.snapshot.HybridScanner; import com.alibaba.fluss.client.scanner.snapshot.SnapshotScan; import com.alibaba.fluss.client.scanner.snapshot.SnapshotScanner; import com.alibaba.fluss.client.table.writer.AppendWriter; @@ -112,4 +114,12 @@ CompletableFuture> limitScan( * @return the {@link SnapshotScanner} to scan data from this table. */ SnapshotScanner getSnapshotScanner(SnapshotScan snapshotScan); + + /** + * Get a {@link HybridScanner} to scan data from this table according to provided {@link + * HybridScan} and merge a piece of log. + * + * @return the {@link HybridScanner} to scan data from this table. + */ + HybridScanner getHybridScanner(HybridScan hybridScan); } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/HybridScannerITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/HybridScannerITCase.java new file mode 100644 index 000000000..88f8b1999 --- /dev/null +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/HybridScannerITCase.java @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.scanner.snapshot; + +import com.alibaba.fluss.client.admin.OffsetSpec; +import com.alibaba.fluss.client.scanner.ScanRecord; +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.snapshot.BucketSnapshotInfo; +import com.alibaba.fluss.client.table.snapshot.BucketsSnapshotInfo; +import com.alibaba.fluss.client.table.snapshot.KvSnapshotInfo; +import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.types.RowType; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT Case for {@link HybridScanner}. */ +public class HybridScannerITCase extends SnapshotScannerITCase { + private static final String DEFAULT_DB = "test-hybrid-scan-db"; + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testScanHybrid(boolean hybridScanWithoutSnapshotFile) throws Exception { + TablePath tablePath = + TablePath.of(DEFAULT_DB, "test-table-hybrid-" + hybridScanWithoutSnapshotFile); + long tableId = createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true); + + // scan the snapshot + Map> expectedRowByBuckets = putRows(tableId, tablePath, 10); + // test read snapshot without waiting snapshot finish. + testHybridRead(tablePath, expectedRowByBuckets, hybridScanWithoutSnapshotFile, null); + + // test again with waiting snapshot finish. + expectedRowByBuckets = putRows(tableId, tablePath, 20); + waitUtilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0); + testHybridRead(tablePath, expectedRowByBuckets, hybridScanWithoutSnapshotFile, null); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testScanHybridWithProjection(boolean hybridScanWithoutSnapshotFile) throws Exception { + TablePath tablePath = + TablePath.of( + DEFAULT_DB, + "test-table-hybrid-projection-" + hybridScanWithoutSnapshotFile); + long tableId = createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true); + + // scan the snapshot + Map> expectedRowByBuckets = putRows(tableId, tablePath, 10); + // test read snapshot without waiting snapshot finish. + testHybridRead( + tablePath, expectedRowByBuckets, hybridScanWithoutSnapshotFile, new int[] {0}); + // test again with waiting snapshot finish. + expectedRowByBuckets = putRows(tableId, tablePath, 20); + waitUtilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0); + testHybridRead( + tablePath, expectedRowByBuckets, hybridScanWithoutSnapshotFile, new int[] {1}); + + // test read snapshot again + expectedRowByBuckets = putRows(tableId, tablePath, 20); + testHybridRead( + tablePath, expectedRowByBuckets, hybridScanWithoutSnapshotFile, new int[] {1, 0}); + } + + private void testHybridRead( + TablePath tablePath, + Map> bucketRows, + boolean hybridScanWithoutSnapshotFile, + int[] projectedFields) + throws Exception { + KvSnapshotInfo tableSnapshotInfo = admin.getKvSnapshot(tablePath).get(); + Map latestOffsets = + admin.listOffsets( + PhysicalTablePath.of(tablePath, null), + bucketRows.keySet().stream() + .map(TableBucket::getBucket) + .collect(Collectors.toSet()), + new OffsetSpec.LatestSpec()) + .all() + .get(); + + BucketsSnapshotInfo bucketsSnapshotInfo = tableSnapshotInfo.getBucketsSnapshots(); + long tableId = tableSnapshotInfo.getTableId(); + try (Table table = conn.getTable(tablePath)) { + for (int bucketId : bucketsSnapshotInfo.getBucketIds()) { + TableBucket tableBucket = new TableBucket(tableId, bucketId); + Optional bucketSnapshotInfo = + bucketsSnapshotInfo.getBucketSnapshotInfo(bucketId); + // get the expected rows + List expectedRows = bucketRows.get(tableBucket); + if (projectedFields != null) { + expectedRows = + expectedRows.stream() + .map(row -> ProjectedRow.from(projectedFields).replaceRow(row)) + .collect(Collectors.toList()); + } + + // create the hybrid scan according to the snapshot files + HybridScan hybridScan = + new HybridScan( + tableBucket, + bucketSnapshotInfo.isPresent() && !hybridScanWithoutSnapshotFile + ? bucketSnapshotInfo.get().getSnapshotFiles() + : Collections.emptyList(), + DEFAULT_SCHEMA, + projectedFields, + bucketSnapshotInfo.isPresent() && !hybridScanWithoutSnapshotFile + ? bucketSnapshotInfo.get().getLogOffset() + : -2, + latestOffsets.get(bucketId)); + HybridScanner hybridScanner = table.getHybridScanner(hybridScan); + + // collect all the records from the scanner + List scanRecords = collectRecords(hybridScanner); + + // check the records + if (projectedFields == null) { + assertScanRecords(scanRecords, expectedRows); + } else { + InternalRow.FieldGetter[] fieldGetters = + new InternalRow.FieldGetter[projectedFields.length]; + RowType rowType = admin.getTableSchema(tablePath).get().getSchema().toRowType(); + for (int i = 0; i < projectedFields.length; i++) { + fieldGetters[i] = + InternalRow.createFieldGetter( + rowType.getTypeAt(projectedFields[i]), i); + } + assertScanRecords(scanRecords, expectedRows, fieldGetters); + } + } + } + } + + private void assertScanRecords( + List actualScanRecords, + List expectRows, + InternalRow.FieldGetter[] fieldGetters) { + List expectedScanRecords = new ArrayList<>(expectRows.size()); + // Transform to GenericRow for comparing value rather than object. + actualScanRecords = + actualScanRecords.stream() + .map( + record -> { + GenericRow genericRow = new GenericRow(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + genericRow.setField( + i, fieldGetters[i].getFieldOrNull(record.getRow())); + } + return new ScanRecord( + record.getOffset(), + record.getTimestamp(), + record.getRowKind(), + genericRow); + }) + .collect(Collectors.toList()); + for (InternalRow row : expectRows) { + GenericRow genericRow = new GenericRow(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + genericRow.setField(i, fieldGetters[i].getFieldOrNull(row)); + } + + expectedScanRecords.add(new ScanRecord(genericRow)); + } + + assertThat(actualScanRecords).containsExactlyInAnyOrderElementsOf(expectedScanRecords); + } +} diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScannerITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScannerITCase.java index 68118947d..db846189f 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScannerITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScannerITCase.java @@ -55,14 +55,14 @@ class SnapshotScannerITCase extends ClientToServerITCaseBase { private static final int DEFAULT_BUCKET_NUM = 3; - private static final Schema DEFAULT_SCHEMA = + protected static final Schema DEFAULT_SCHEMA = Schema.newBuilder() .primaryKey("id") .column("id", DataTypes.INT()) .column("name", DataTypes.STRING()) .build(); - private static final TableDescriptor DEFAULT_TABLE_DESCRIPTOR = + protected static final TableDescriptor DEFAULT_TABLE_DESCRIPTOR = TableDescriptor.builder() .schema(DEFAULT_SCHEMA) .distributedBy(DEFAULT_BUCKET_NUM, "id") @@ -117,8 +117,8 @@ void testScanSnapshot() throws Exception { testSnapshotRead(tablePath, expectedRowByBuckets); } - private Map> putRows(long tableId, TablePath tablePath, int rows) - throws Exception { + protected Map> putRows( + long tableId, TablePath tablePath, int rows) throws Exception { Map> rowsByBuckets = new HashMap<>(); try (Table table = conn.getTable(tablePath)) { UpsertWriter upsertWriter = table.getUpsertWriter(); @@ -172,13 +172,13 @@ private static int getBucketId(InternalRow row) { return DEFAULT_BUCKET_ASSIGNER.assignBucket(key, Cluster.empty()); } - private void waitUtilAllSnapshotFinished(Set tableBuckets, long snapshotId) { + protected void waitUtilAllSnapshotFinished(Set tableBuckets, long snapshotId) { for (TableBucket tableBucket : tableBuckets) { FLUSS_CLUSTER_EXTENSION.waitUtilSnapshotFinished(tableBucket, snapshotId); } } - private List collectRecords(SnapshotScanner snapshotScanner) { + protected List collectRecords(SnapshotScanner snapshotScanner) { List scanRecords = new ArrayList<>(); Iterator recordIterator = snapshotScanner.poll(Duration.ofSeconds(10)); while (recordIterator != null) { @@ -192,7 +192,7 @@ private List collectRecords(SnapshotScanner snapshotScanner) { return scanRecords; } - private void assertScanRecords( + protected void assertScanRecords( List actualScanRecords, List expectRows) { List expectedScanRecords = new ArrayList<>(expectRows.size()); for (InternalRow row : expectRows) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkSource.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkSource.java index 475fe3d12..9fd6b78f4 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkSource.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkSource.java @@ -136,6 +136,7 @@ public SourceReader createReader(SourceReaderContext c sourceOutputType, context, projectedFields, - flinkSourceReaderMetrics); + flinkSourceReaderMetrics, + streaming); } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java index 49ecb87f9..6800aef34 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java @@ -263,10 +263,6 @@ public boolean isBounded() { + modificationScanType + " statement with conditions on primary key."); } - if (!isDataLakeEnabled) { - throw new UnsupportedOperationException( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); - } return source; } }; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumerator.java index 3681677ed..60215c9d8 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumerator.java @@ -23,7 +23,6 @@ import com.alibaba.fluss.client.table.snapshot.BucketsSnapshotInfo; import com.alibaba.fluss.client.table.snapshot.KvSnapshotInfo; import com.alibaba.fluss.client.table.snapshot.PartitionSnapshotInfo; -import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.connector.flink.lakehouse.LakeSplitGenerator; import com.alibaba.fluss.connector.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl; @@ -67,6 +66,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static com.alibaba.fluss.utils.Preconditions.checkNotNull; @@ -201,8 +201,16 @@ public void start() { ExceptionUtils.stripCompletionException(e)); } + if (streaming) { + startInStreamingMode(); + } else { + startInBatchMode(); + } + } + + private void startInStreamingMode() { if (isPartitioned) { - if (streaming && scanPartitionDiscoveryIntervalMs > 0) { + if (scanPartitionDiscoveryIntervalMs > 0) { // should do partition discovery LOG.info( "Starting the FlussSourceEnumerator for table {} " @@ -216,40 +224,33 @@ public void start() { 0, scanPartitionDiscoveryIntervalMs); } else { - if (!streaming) { - startInBatchMode(); - } else { - // just call once - LOG.info( - "Starting the FlussSourceEnumerator for table {} without partition discovery.", - tablePath); - context.callAsync(this::listPartitions, this::checkPartitionChanges); - } + // just call once + LOG.info( + "Starting the FlussSourceEnumerator for table {} without partition discovery.", + tablePath); + context.callAsync(this::listPartitions, this::checkPartitionChanges); } - } else { - if (!streaming) { - startInBatchMode(); - } else { - // init bucket splits and assign - context.callAsync(this::initNonPartitionedSplits, this::handleSplitsAdd); - } + // init bucket splits and assign + context.callAsync(this::initNonPartitionedSplits, this::handleSplitsAdd); } } private void startInBatchMode() { if (lakeEnabled) { context.callAsync(this::getLakeSplit, this::handleSplitsAdd); + } else if (isPartitioned) { + // just call once + LOG.info("Starting the FlussSourceEnumerator for table {} in batch mode.", tablePath); + context.callAsync(this::listPartitions, this::checkPartitionChanges); } else { - throw new UnsupportedOperationException( - String.format( - "Batch only supports when table option '%s' is set to true.", - ConfigOptions.TABLE_DATALAKE_ENABLED)); + // init bucket splits and assign + context.callAsync(this::initNonPartitionedSplits, this::handleSplitsAdd); } } private List initNonPartitionedSplits() { - if (hasPrimaryKey && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer) { + if (hasPrimaryKey) { // get the table snapshot info KvSnapshotInfo kvSnapshotInfo; try { @@ -260,7 +261,13 @@ private List initNonPartitionedSplits() { ExceptionUtils.stripCompletionException(e)); } return getSnapshotAndLogSplits( - kvSnapshotInfo.getTableId(), null, null, kvSnapshotInfo.getBucketsSnapshots()); + kvSnapshotInfo.getTableId(), + null, + null, + kvSnapshotInfo.getBucketsSnapshots().getBucketIds(), + startingOffsetsInitializer instanceof SnapshotOffsetsInitializer + ? kvSnapshotInfo.getBucketsSnapshots() + : new BucketsSnapshotInfo(Collections.emptyMap())); } else { return getLogSplit(null, null); } @@ -338,8 +345,10 @@ private PartitionChange getPartitionChange(Set fetchedPartitionIn } private List initPartitionedSplits(Collection newPartitions) { - if (hasPrimaryKey && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer) { - return initPrimaryKeyTablePartitionSplits(newPartitions); + if (hasPrimaryKey) { + return initPrimaryKeyTablePartitionSplits( + newPartitions, + startingOffsetsInitializer instanceof SnapshotOffsetsInitializer); } else { return initLogTablePartitionSplits(newPartitions); } @@ -355,7 +364,7 @@ private List initLogTablePartitionSplits( } private List initPrimaryKeyTablePartitionSplits( - Collection newPartitions) { + Collection newPartitions, boolean includedSnapshot) { List splits = new ArrayList<>(); for (PartitionInfo partitionInfo : newPartitions) { PartitionSnapshotInfo partitionSnapshotInfo; @@ -375,7 +384,10 @@ private List initPrimaryKeyTablePartitionSplits( partitionSnapshotInfo.getTableId(), partitionInfo.getPartitionId(), partitionInfo.getPartitionName(), - partitionSnapshotInfo.getBucketsSnapshotInfo())); + partitionSnapshotInfo.getBucketsSnapshotInfo().getBucketIds(), + includedSnapshot + ? partitionSnapshotInfo.getBucketsSnapshotInfo() + : new BucketsSnapshotInfo(Collections.emptyMap()))); } return splits; } @@ -384,10 +396,14 @@ private List getSnapshotAndLogSplits( long tableId, @Nullable Long partitionId, @Nullable String partitionName, + Collection bucketIds, BucketsSnapshotInfo bucketsSnapshotInfo) { List splits = new ArrayList<>(); List bucketsNeedInitOffset = new ArrayList<>(); - for (Integer bucketId : bucketsSnapshotInfo.getBucketIds()) { + Map stoppingOffsets = + stoppingOffsetsInitializer.getBucketOffsets( + partitionName, bucketIds, bucketOffsetsRetriever); + for (Integer bucketId : bucketIds) { TableBucket tb = new TableBucket(tableId, partitionId, bucketId); // the ignore logic rely on the enumerator will always send splits for same bucket // in one batch; if we can ignore the bucket, we can skip all the splits(snapshot + @@ -406,7 +422,8 @@ private List getSnapshotAndLogSplits( tb, partitionName, snapshot.getSnapshotFiles(), - snapshot.getLogOffset())); + snapshot.getLogOffset(), + stoppingOffsets.get(bucketId))); } else { bucketsNeedInitOffset.add(bucketId); } @@ -418,10 +435,19 @@ private List getSnapshotAndLogSplits( .forEach( (bucketId, startingOffset) -> splits.add( - new LogSplit( - new TableBucket(tableId, partitionId, bucketId), - partitionName, - startingOffset))); + streaming + ? new LogSplit( + new TableBucket( + tableId, partitionId, bucketId), + partitionName, + startingOffset) + : new HybridSnapshotLogSplit( + new TableBucket( + tableId, partitionId, bucketId), + partitionName, + Collections.emptyList(), + startingOffset, + stoppingOffsets.get(bucketId)))); } return splits; @@ -441,6 +467,11 @@ private List getLogSplit( } if (!bucketsNeedInitOffset.isEmpty()) { + Map stoppingOffsets = + stoppingOffsetsInitializer.getBucketOffsets( + partitionName, + IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()), + bucketOffsetsRetriever); startingOffsetsInitializer .getBucketOffsets(partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever) .forEach( @@ -449,7 +480,8 @@ private List getLogSplit( new LogSplit( new TableBucket(tableId, partitionId, bucketId), partitionName, - startingOffset))); + startingOffset, + stoppingOffsets.get(bucketId)))); } return splits; } @@ -574,7 +606,7 @@ private void assignPendingSplits(Set pendingReaders) { context.assignSplits(new SplitsAssignment<>(incrementalAssignment)); } - if (noMoreNewSplits) { + if (noMoreNewSplits && !streaming) { LOG.info( "No more FlussSplits to assign. Sending NoMoreSplitsEvent to reader {}", pendingReaders); diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java index 3da0f7d89..688cc0486 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java @@ -19,8 +19,10 @@ import javax.annotation.Nullable; import java.util.Collection; -import java.util.Collections; import java.util.Map; +import java.util.stream.Collectors; + +import static com.alibaba.fluss.connector.flink.source.split.LogSplit.NO_STOPPING_OFFSET; /** * An implementation of {@link OffsetsInitializer} which does not initialize anything. @@ -36,6 +38,7 @@ public Map getBucketOffsets( @Nullable String partitionName, Collection buckets, OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever) { - return Collections.emptyMap(); + return buckets.stream() + .collect(Collectors.toMap(bucketId -> bucketId, bucketId -> NO_STOPPING_OFFSET)); } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkRecordsWithSplitIds.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkRecordsWithSplitIds.java index ba9f8013a..bc939806b 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkRecordsWithSplitIds.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkRecordsWithSplitIds.java @@ -103,7 +103,7 @@ public FlinkRecordsWithSplitIds( this.splitRecords = splitRecords; this.splitIterator = splitIterator; this.tableBucketIterator = tableBucketIterator; - this.finishedSplits = finishedSplits; + this.finishedSplits = new HashSet<>(finishedSplits); this.flinkSourceReaderMetrics = flinkSourceReaderMetrics; } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReader.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReader.java index 32bcbd04a..dffb8bc2b 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReader.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReader.java @@ -56,7 +56,8 @@ public FlinkSourceReader( RowType sourceOutputType, SourceReaderContext context, @Nullable int[] projectedFields, - FlinkSourceReaderMetrics flinkSourceReaderMetrics) { + FlinkSourceReaderMetrics flinkSourceReaderMetrics, + boolean streaming) { super( elementsQueue, new FlinkSourceFetcherManager( @@ -67,7 +68,8 @@ public FlinkSourceReader( tablePath, sourceOutputType, projectedFields, - flinkSourceReaderMetrics), + flinkSourceReaderMetrics, + streaming), (ignore) -> {}), new FlinkRecordEmitter(sourceOutputType), context.getConfiguration(), diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReader.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReader.java index 967983523..749fb16eb 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReader.java @@ -103,13 +103,15 @@ public class FlinkSourceSplitReader implements SplitReader emptyLogSplits; + private final boolean streaming; public FlinkSourceSplitReader( Configuration flussConf, TablePath tablePath, RowType sourceOutputType, @Nullable int[] projectedFields, - FlinkSourceReaderMetrics flinkSourceReaderMetrics) { + FlinkSourceReaderMetrics flinkSourceReaderMetrics, + boolean streaming) { this.flinkMetricRegistry = new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup()); this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry); @@ -128,6 +130,7 @@ public FlinkSourceSplitReader( this.logScanner = table.getLogScanner(scan); this.stoppingOffsets = new HashMap<>(); this.emptyLogSplits = new HashSet<>(); + this.streaming = streaming; } @Override @@ -144,8 +147,10 @@ public RecordsWithSplitIds fetch() throws IOException { } else { // may need to finish empty log splits if (!emptyLogSplits.isEmpty()) { + FlinkRecordsWithSplitIds flinkRecordsWithSplitIds = + new FlinkRecordsWithSplitIds(emptyLogSplits, flinkSourceReaderMetrics); emptyLogSplits.clear(); - return new FlinkRecordsWithSplitIds(emptyLogSplits, flinkSourceReaderMetrics); + return flinkRecordsWithSplitIds; } else { ScanRecords scanRecords = logScanner.poll(POLL_TIMEOUT); return forLogRecords(scanRecords); @@ -182,9 +187,15 @@ public void handleSplitsChanges(SplitsChange splitsChanges) { boundedSplits.add(sourceSplitBase); } // still need to subscribe log - subscribeLog(sourceSplitBase, hybridSnapshotLogSplit.getLogStartingOffset()); + subscribeLog( + sourceSplitBase, + hybridSnapshotLogSplit.getLogStartingOffset(), + hybridSnapshotLogSplit.getLogStoppingOffset()); } else if (sourceSplitBase.isLogSplit()) { - subscribeLog(sourceSplitBase, sourceSplitBase.asLogSplit().getStartingOffset()); + subscribeLog( + sourceSplitBase, + sourceSplitBase.asLogSplit().getStartingOffset(), + sourceSplitBase.asLogSplit().getStoppingOffset()); } else if (sourceSplitBase.isLakeSplit()) { getLakeSplitReader().addSplit(sourceSplitBase, boundedSplits); } else { @@ -204,28 +215,26 @@ private LakeSplitReaderGenerator getLakeSplitReader() { return lakeSplitReaderGenerator; } - private void subscribeLog(SourceSplitBase split, long startingOffset) { + private void subscribeLog( + SourceSplitBase split, long startingOffset, Optional stoppingOffsetOpt) { // assign bucket offset dynamically TableBucket tableBucket = split.getTableBucket(); boolean isEmptyLogSplit = false; - if (split instanceof LogSplit) { - LogSplit logSplit = split.asLogSplit(); - Optional stoppingOffsetOpt = logSplit.getStoppingOffset(); - if (stoppingOffsetOpt.isPresent()) { - Long stoppingOffset = stoppingOffsetOpt.get(); - if (startingOffset >= stoppingOffset) { - // is empty log splits as no log record can be fetched - emptyLogSplits.add(split.splitId()); - isEmptyLogSplit = true; - } else if (stoppingOffset >= 0) { - stoppingOffsets.put(tableBucket, stoppingOffset); - } else { - // This should not happen. - throw new FlinkRuntimeException( - String.format( - "Invalid stopping offset %d for bucket %s", - stoppingOffset, tableBucket)); - } + + if (stoppingOffsetOpt.isPresent()) { + Long stoppingOffset = stoppingOffsetOpt.get(); + if (startingOffset >= stoppingOffset || stoppingOffset == 0) { + // is empty log splits as no log record can be fetched + emptyLogSplits.add(split.splitId()); + isEmptyLogSplit = true; + } else if (stoppingOffset >= 0) { + stoppingOffsets.put(tableBucket, stoppingOffset); + } else { + // This should not happen. + throw new FlinkRuntimeException( + String.format( + "Invalid stopping offset %d for bucket %s", + stoppingOffset, tableBucket)); } } @@ -234,7 +243,7 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) { "Skip to read log for split {} since the split is empty with starting offset {}, stopping offset {}.", split.splitId(), startingOffset, - split.asLogSplit().getStoppingOffset().get()); + stoppingOffsetOpt.get()); } else { Long partitionId = tableBucket.getPartitionId(); int bucket = tableBucket.getBucket(); @@ -315,8 +324,11 @@ private void checkSnapshotSplitOrStartNext() { currentBoundedSplit.asHybridSnapshotLogSplit(); currentSplitSkipReader = new SplitSkipReader( - new SnapshotSplitScanner( - table, projectedFields, hybridSnapshotLogSplit), + streaming + ? SnapshotSplitScanner.ofSnapshotScanner( + table, projectedFields, hybridSnapshotLogSplit) + : SnapshotSplitScanner.ofHybridScanner( + table, projectedFields, hybridSnapshotLogSplit), hybridSnapshotLogSplit.recordsToSkip()); } else if (currentBoundedSplit.isLakeSplit()) { currentSplitSkipReader = @@ -436,9 +448,9 @@ private long getStoppingOffset(TableBucket tableBucket) { public FlinkRecordsWithSplitIds finishCurrentBoundedSplit() throws IOException { Set finishedSplits = - currentBoundedSplit instanceof HybridSnapshotLogSplit - // is hybrid split, not to finish this split - // since it remains log to read + currentBoundedSplit instanceof HybridSnapshotLogSplit && streaming + // Is hybrid split and in streaming node, not to finish this split since it + // remains log to read ? Collections.emptySet() : Collections.singleton(currentBoundedSplit.splitId()); final FlinkRecordsWithSplitIds finishRecords = diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/SnapshotSplitScanner.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/SnapshotSplitScanner.java index 2f1587047..be8834a90 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/SnapshotSplitScanner.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/SnapshotSplitScanner.java @@ -17,9 +17,11 @@ package com.alibaba.fluss.connector.flink.source.reader; import com.alibaba.fluss.client.scanner.ScanRecord; +import com.alibaba.fluss.client.scanner.snapshot.HybridScan; import com.alibaba.fluss.client.scanner.snapshot.SnapshotScan; import com.alibaba.fluss.client.scanner.snapshot.SnapshotScanner; import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.connector.flink.source.split.HybridSnapshotLogSplit; import com.alibaba.fluss.connector.flink.source.split.SnapshotSplit; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.utils.CloseableIterator; @@ -34,7 +36,11 @@ public class SnapshotSplitScanner implements SplitScanner { private final SnapshotScanner snapshotScanner; - public SnapshotSplitScanner( + private SnapshotSplitScanner(SnapshotScanner scanner) { + this.snapshotScanner = scanner; + } + + public static SnapshotSplitScanner ofSnapshotScanner( Table table, @Nullable int[] projectedFields, SnapshotSplit snapshotSplit) { Schema tableSchema = table.getDescriptor().getSchema(); SnapshotScan snapshotScan = @@ -43,7 +49,27 @@ public SnapshotSplitScanner( snapshotSplit.getSnapshotFiles(), tableSchema, projectedFields); - this.snapshotScanner = table.getSnapshotScanner(snapshotScan); + return new SnapshotSplitScanner(table.getSnapshotScanner(snapshotScan)); + } + + public static SnapshotSplitScanner ofHybridScanner( + Table table, + @Nullable int[] projectedFields, + HybridSnapshotLogSplit hybridSnapshotLogSplit) { + if (!hybridSnapshotLogSplit.getLogStoppingOffset().isPresent()) { + throw new IllegalArgumentException( + "Stopping offset must greater than 0 for batch read."); + } + Schema tableSchema = table.getDescriptor().getSchema(); + HybridScan hybridScan = + new HybridScan( + hybridSnapshotLogSplit.getTableBucket(), + hybridSnapshotLogSplit.getSnapshotFiles(), + tableSchema, + projectedFields, + hybridSnapshotLogSplit.getLogStartingOffset(), + hybridSnapshotLogSplit.getLogStoppingOffset().get()); + return new SnapshotSplitScanner(table.getHybridScanner(hybridScan)); } @Nullable diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplit.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplit.java index 36fd320b9..80d5eb7e9 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplit.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplit.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.Objects; +import java.util.Optional; + +import static com.alibaba.fluss.connector.flink.source.split.LogSplit.NO_STOPPING_OFFSET; /** * The hybrid split for first reading the snapshot files and then switch to read the cdc log from a @@ -36,13 +39,14 @@ public class HybridSnapshotLogSplit extends SnapshotSplit { private static final String HYBRID_SPLIT_PREFIX = "hybrid-snapshot-log-"; private final boolean isSnapshotFinished; private final long logStartingOffset; + private final long logStoppingOffset; public HybridSnapshotLogSplit( TableBucket tableBucket, @Nullable String partitionName, List snapshotFiles, long logStartingOffset) { - this(tableBucket, partitionName, snapshotFiles, 0, false, logStartingOffset); + this(tableBucket, partitionName, snapshotFiles, logStartingOffset, NO_STOPPING_OFFSET); } public HybridSnapshotLogSplit( @@ -52,15 +56,54 @@ public HybridSnapshotLogSplit( long recordsToSkip, boolean isSnapshotFinished, long logStartingOffset) { + this( + tableBucket, + partitionName, + snapshotFiles, + recordsToSkip, + isSnapshotFinished, + logStartingOffset, + NO_STOPPING_OFFSET); + } + + public HybridSnapshotLogSplit( + TableBucket tableBucket, + @Nullable String partitionName, + List snapshotFiles, + long logStartingOffset, + long logStoppingOffset) { + this( + tableBucket, + partitionName, + snapshotFiles, + 0, + false, + logStartingOffset, + logStoppingOffset); + } + + public HybridSnapshotLogSplit( + TableBucket tableBucket, + @Nullable String partitionName, + List snapshotFiles, + long recordsToSkip, + boolean isSnapshotFinished, + long logStartingOffset, + long logStoppingOffset) { super(tableBucket, partitionName, snapshotFiles, recordsToSkip); this.isSnapshotFinished = isSnapshotFinished; this.logStartingOffset = logStartingOffset; + this.logStoppingOffset = logStoppingOffset; } public long getLogStartingOffset() { return logStartingOffset; } + public Optional getLogStoppingOffset() { + return logStoppingOffset >= 0 ? Optional.of(logStoppingOffset) : Optional.empty(); + } + public boolean isSnapshotFinished() { return isSnapshotFinished; } @@ -83,21 +126,13 @@ public boolean equals(Object o) { } HybridSnapshotLogSplit that = (HybridSnapshotLogSplit) o; return isSnapshotFinished == that.isSnapshotFinished - && logStartingOffset == that.logStartingOffset; + && logStartingOffset == that.logStartingOffset + && logStoppingOffset == that.logStoppingOffset; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), isSnapshotFinished, logStartingOffset); - } - - @Override - public String toString() { - return "HybridSnapshotLogSplit{" - + "recordsToSkip=" - + recordsToSkip - + ", tableBucket=" - + tableBucket - + '}'; + return Objects.hash( + super.hashCode(), isSnapshotFinished, logStartingOffset, logStoppingOffset); } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplitState.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplitState.java index c73a025b8..4674795f6 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplitState.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplitState.java @@ -16,6 +16,8 @@ package com.alibaba.fluss.connector.flink.source.split; +import static com.alibaba.fluss.connector.flink.source.split.LogSplit.NO_STOPPING_OFFSET; + /** The state of {@link HybridSnapshotLogSplit}. */ public class HybridSnapshotLogSplitState extends SourceSplitState { @@ -40,7 +42,8 @@ public HybridSnapshotLogSplit toSourceSplit() { hybridSnapshotLogSplit.getSnapshotFiles(), recordsToSkip, snapshotFinished, - offset); + offset, + hybridSnapshotLogSplit.getLogStoppingOffset().orElse(NO_STOPPING_OFFSET)); } public void setRecordsToSkip(long recordsToSkip) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/LogSplitState.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/LogSplitState.java index d0addc36d..3e1f30c82 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/LogSplitState.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/LogSplitState.java @@ -16,6 +16,8 @@ package com.alibaba.fluss.connector.flink.source.split; +import static com.alibaba.fluss.connector.flink.source.split.LogSplit.NO_STOPPING_OFFSET; + /** The state of {@link LogSplit}. */ public class LogSplitState extends SourceSplitState { @@ -32,6 +34,10 @@ public void setOffset(long offset) { @Override public LogSplit toSourceSplit() { final LogSplit logSplit = (LogSplit) split; - return new LogSplit(logSplit.tableBucket, logSplit.getPartitionName(), offset); + return new LogSplit( + logSplit.tableBucket, + logSplit.getPartitionName(), + offset, + logSplit.getStoppingOffset().orElse(NO_STOPPING_OFFSET)); } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/SourceSplitSerializer.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/SourceSplitSerializer.java index 6bec030f5..ad308728c 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/SourceSplitSerializer.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/SourceSplitSerializer.java @@ -29,6 +29,8 @@ import java.util.ArrayList; import java.util.List; +import static com.alibaba.fluss.connector.flink.source.split.LogSplit.NO_STOPPING_OFFSET; + /** A serializer for the {@link SourceSplitBase}. */ public class SourceSplitSerializer implements SimpleVersionedSerializer { @@ -36,13 +38,16 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); private static final byte HYBRID_SNAPSHOT_SPLIT_FLAG = 1; private static final byte LOG_SPLIT_FLAG = 2; - private static final int CURRENT_VERSION = VERSION_0; + private static final int CURRENT_VERSION = VERSION_1; private LakeSplitSerializer lakeSplitSerializer; @@ -70,6 +75,11 @@ public byte[] serialize(SourceSplitBase split) throws IOException { out.writeBoolean(hybridSnapshotLogSplit.isSnapshotFinished()); // write log starting offset out.writeLong(hybridSnapshotLogSplit.getLogStartingOffset()); + // write log stopping offset + out.writeLong( + hybridSnapshotLogSplit + .getLogStoppingOffset() + .orElse(LogSplit.NO_STOPPING_OFFSET)); } else { LogSplit logSplit = split.asLogSplit(); // write starting offset @@ -127,7 +137,7 @@ private List deserializeSnapshotFiles(DataInputDeserializer i @Override public SourceSplitBase deserialize(int version, byte[] serialized) throws IOException { - if (version != VERSION_0) { + if (version > CURRENT_VERSION) { throw new IOException("Unknown version " + version); } final DataInputDeserializer in = new DataInputDeserializer(serialized); @@ -149,13 +159,18 @@ public SourceSplitBase deserialize(int version, byte[] serialized) throws IOExce long recordsToSkip = in.readLong(); boolean isSnapshotFinished = in.readBoolean(); long logStartingOffset = in.readLong(); + long logStoppingOffset = NO_STOPPING_OFFSET; + if (version >= VERSION_1) { + logStoppingOffset = in.readLong(); + } return new HybridSnapshotLogSplit( tableBucket, partitionName, fsPathAndFIleNames, recordsToSkip, isSnapshotFinished, - logStartingOffset); + logStartingOffset, + logStoppingOffset); } else if (splitKind == LOG_SPLIT_FLAG) { long startingOffset = in.readLong(); long stoppingOffset = in.readLong(); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceBatchITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceBatchITCase.java index c2cf36030..9c1ef2b11 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceBatchITCase.java @@ -38,12 +38,15 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static com.alibaba.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; @@ -170,14 +173,12 @@ void testScanSingleRowFilterOnPartitionedTable() throws Exception { @Test void testScanSingleRowFilterException() throws Exception { String tableName = prepareSourceTable(new String[] {"id", "name"}, null); + // doesn't have all condition for primary key String query = String.format("SELECT * FROM %s WHERE id = 1", tableName); - // doesn't have all condition for primary key, doesn't support to execute - assertThatThrownBy(() -> tEnv.explainSql(query)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessage( - "Currently, Fluss only support queries on table with datalake enabled" - + " or point queries on primary key when it's in batch execution mode."); + CloseableIterator collected = tEnv.executeSql(query).collect(); + List expected = Collections.singletonList("+I[1, address1, name1]"); + assertResultsIgnoreOrder(collected, expected, true); } @Test @@ -272,6 +273,75 @@ void testLimitLogTableScan() throws Exception { assertThat(collected).hasSize(3); } + @ParameterizedTest + @ValueSource(strings = {"initial", "earliest"}) + void testScanNoFilterOnPartitionedTable(String scanStartupMode) throws Exception { + String tableName = + prepareSourceTable( + new String[] {"id", "dt"}, + "dt", + Collections.singletonMap("scan.startup.mode", scanStartupMode)); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + Map partitionNameById = + waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath); + Iterator partitionIterator = + partitionNameById.values().stream().sorted().iterator(); + String partition1 = partitionIterator.next(); + String query = String.format("SELECT * FROM %s ", tableName); + + CloseableIterator collected = tEnv.executeSql(query).collect(); + List expected = + IntStream.range(1, 6) + .mapToObj( + i -> + String.format( + "+I[%s, address%s, name%s, %s]", + i, i, i, partition1)) + .collect(Collectors.toList()); + assertResultsIgnoreOrder(collected, expected, true); + } + + @ParameterizedTest + @ValueSource(strings = {"initial", "earliest"}) + void testScanSingleRowProjectionAndNonPkFilter(String scanStartupMode) throws Exception { + String tableName = + prepareSourceTable( + new String[] {"name"}, + null, + Collections.singletonMap("scan.startup.mode", scanStartupMode)); + String query = String.format("SELECT address FROM %s where id = 2 ", tableName); + CloseableIterator collected = tEnv.executeSql(query).collect(); + List expected = Arrays.asList("+I[address2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @ParameterizedTest + @ValueSource(strings = {"initial", "earliest"}) + void testScanTableNoFilter(String scanStartupMode) throws Exception { + String tableName = + prepareSourceTable( + new String[] {"name", "id"}, + null, + Collections.singletonMap("scan.startup.mode", scanStartupMode)); + String query = String.format("SELECT * FROM %s ", tableName); + + assertThat(tEnv.explainSql(query)) + .contains( + String.format( + "TableSourceScan(table=[[testcatalog, defaultdb, %s]], " + + "fields=[id, address, name])", + tableName)); + CloseableIterator collected = tEnv.executeSql(query).collect(); + List expected = + Arrays.asList( + "+I[1, address1, name1]", + "+I[2, address2, name2]", + "+I[3, address3, name3]", + "+I[4, address4, name4]", + "+I[5, address5, name5]"); + assertResultsIgnoreOrder(collected, expected, true); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testCountPushDown(boolean partitionTable) throws Exception { @@ -289,35 +359,48 @@ void testCountPushDown(boolean partitionTable) throws Exception { CloseableIterator iterRows = tEnv.executeSql(query).collect(); List collected = assertAndCollectRecords(iterRows, 1); List expected = Collections.singletonList(String.format("+I[%s]", expectedRows)); + assertThat(collected).isEqualTo(expected); // test not push down grouping count. - assertThatThrownBy( - () -> - tEnv.explainSql( - String.format( - "SELECT COUNT(*) FROM %s group by id", - tableName)) - .wait()) - .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + query = String.format("SELECT id, COUNT(*) FROM %s group by id", tableName); + iterRows = tEnv.executeSql(query).collect(); + collected = assertAndCollectRecords(iterRows, 5); + expected = new ArrayList<>(); + for (int i = 1; i <= 5; i++) { + expected.add(String.format("+I[%s, %s]", i, partitionTable ? 4 : 1)); + } + assertThat(collected).containsExactlyInAnyOrderElementsOf(expected); // test not support primary key now String primaryTableName = prepareSourceTable(new String[] {"id"}, null); - assertThatThrownBy( - () -> - tEnv.explainSql( - String.format( - "SELECT COUNT(*) FROM %s ", - primaryTableName)) - .wait()) - .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + query = String.format("SELECT COUNT(*) FROM %s ", primaryTableName); + iterRows = tEnv.executeSql(query).collect(); + collected = assertAndCollectRecords(iterRows, 1); + expected = Collections.singletonList(String.format("+I[%s]", 5)); + assertThat(collected).isEqualTo(expected); } private String prepareSourceTable(String[] keys, String partitionedKey) throws Exception { + return prepareSourceTable(keys, partitionedKey, Collections.emptyMap()); + } + + private String prepareSourceTable( + String[] keys, String partitionedKey, Map otherOptions) + throws Exception { String tableName = String.format("test_%s_%s", String.join("_", keys), RandomUtils.nextInt()); + String options = + otherOptions.isEmpty() + ? "" + : "," + + otherOptions.entrySet().stream() + .map( + e -> + String.format( + "'%s'='%s'", + e.getKey(), e.getValue())) + .collect(Collectors.joining(",")); if (partitionedKey == null) { tEnv.executeSql( String.format( @@ -326,8 +409,10 @@ private String prepareSourceTable(String[] keys, String partitionedKey) throws E + " address varchar," + " name varchar," + " primary key (%s) NOT ENFORCED)" - + " with ('bucket.num' = '4')", - tableName, String.join(",", keys))); + + " with ('bucket.num' = '4'" + + " %s " + + ")", + tableName, String.join(",", keys), options)); } else { tEnv.executeSql( String.format( @@ -340,8 +425,10 @@ private String prepareSourceTable(String[] keys, String partitionedKey) throws E + " with (" + " 'bucket.num' = '4', " + " 'table.auto-partition.enabled' = 'true'," - + " 'table.auto-partition.time-unit' = 'year')", - tableName, String.join(",", keys), partitionedKey)); + + " 'table.auto-partition.time-unit' = 'year'" + + " %s " + + ")", + tableName, String.join(",", keys), partitionedKey, options)); } TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumeratorTest.java index e01061c86..89b9c1339 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -59,6 +59,7 @@ import java.util.Set; import static com.alibaba.fluss.client.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.connector.flink.source.split.LogSplit.NO_STOPPING_OFFSET; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static org.assertj.core.api.Assertions.assertThat; @@ -80,28 +81,31 @@ protected static void beforeAll() { Duration.ofSeconds(10).toString()); } - @Test - void testPkTableNoSnapshotSplits() throws Throwable { - long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testPkTableNoSnapshotSplits(boolean isStreamingMode) throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "test-flink-table-" + isStreamingMode); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId); int numSubtasks = 3; // test get snapshot split & log split and the assignment try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, + tablePath, flussConf, true, false, context, OffsetsInitializer.initial(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, - streaming); + isStreamingMode); enumerator.start(); // register all read - for (int i = 0; i < 3; i++) { + for (int i = 0; i < numSubtasks; i++) { registerReader(context, enumerator, i); } @@ -113,36 +117,52 @@ void testPkTableNoSnapshotSplits() throws Throwable { Map> expectedAssignment = new HashMap<>(); for (int i = 0; i < numSubtasks; i++) { // one split for one subtask - expectedAssignment.put(i, Collections.singletonList(genLogSplit(tableId, i))); + expectedAssignment.put( + i, + Collections.singletonList( + isStreamingMode + ? genLogSplit(tableId, i) + : new HybridSnapshotLogSplit( + new TableBucket(tableId, i), + null, + Collections.emptyList(), + -2, + 0))); } Map> actualAssignment = getLastReadersAssignments(context); assertThat(actualAssignment).isEqualTo(expectedAssignment); + for (int i = 0; i < numSubtasks; i++) { + assertThat(context.hasNoMoreSplits(i)).isEqualTo(!isStreamingMode); + } } } - @Test - void testPkTableWithSnapshotSplits() throws Throwable { - long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testPkTableWithSnapshotSplits(boolean isStreamingMode) throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "test-flink-table-" + isStreamingMode); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); int numSubtasks = 5; + int rowNum = 10; // write data and wait snapshot finish to make sure // we can hava snapshot split - Map bucketIdToNumRecords = putRows(DEFAULT_TABLE_PATH, 10); + Map bucketIdToNumRecords = putRows(tablePath, rowNum); waitUntilSnapshot(tableId, 0); try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, + tablePath, flussConf, true, false, context, OffsetsInitializer.initial(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, - streaming); + isStreamingMode); enumerator.start(); // register all read for (int i = 0; i < numSubtasks; i++) { @@ -169,7 +189,8 @@ void testPkTableWithSnapshotSplits() throws Throwable { bucket0, null, Collections.emptyList(), - bucketIdToNumRecords.get(0)))); + bucketIdToNumRecords.get(0), + isStreamingMode ? NO_STOPPING_OFFSET : rowNum))); expectedAssignment.put( 1, Collections.singletonList( @@ -177,7 +198,8 @@ void testPkTableWithSnapshotSplits() throws Throwable { bucket1, null, Collections.emptyList(), - bucketIdToNumRecords.get(1)))); + bucketIdToNumRecords.get(1), + isStreamingMode ? NO_STOPPING_OFFSET : rowNum))); expectedAssignment.put( 2, Collections.singletonList( @@ -185,8 +207,12 @@ void testPkTableWithSnapshotSplits() throws Throwable { bucket2, null, Collections.emptyList(), - bucketIdToNumRecords.get(2)))); + bucketIdToNumRecords.get(2), + isStreamingMode ? NO_STOPPING_OFFSET : rowNum))); checkSplitAssignmentIgnoreSnapshotFiles(expectedAssignment, actualAssignment); + for (int i = 0; i < numSubtasks; i++) { + assertThat(context.hasNoMoreSplits(i)).isEqualTo(!isStreamingMode); + } } } diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReaderTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReaderTest.java index 459149eeb..cb7ec9361 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReaderTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReaderTest.java @@ -21,6 +21,7 @@ import com.alibaba.fluss.connector.flink.source.event.PartitionsRemovedEvent; import com.alibaba.fluss.connector.flink.source.metrics.FlinkSourceReaderMetrics; import com.alibaba.fluss.connector.flink.source.split.LogSplit; +import com.alibaba.fluss.connector.flink.source.split.SourceSplitState; import com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; @@ -34,11 +35,15 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.table.data.RowData; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -48,6 +53,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static com.alibaba.fluss.connector.flink.source.split.LogSplit.NO_STOPPING_OFFSET; import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; @@ -84,7 +90,8 @@ void testHandlePartitionsRemovedEvent() throws Exception { clientConf, tablePath, tableDescriptor.getSchema().toRowType(), - readerContext)) { + readerContext, + Collections.emptyList())) { // first of all, add all splits of all partitions to the reader Map> assignedBuckets = new HashMap<>(); @@ -154,11 +161,108 @@ void testHandlePartitionsRemovedEvent() throws Exception { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHandleAddSplit(boolean isStreamingMode) throws Exception { + TablePath tablePath = TablePath.of(DEFAULT_DB, "test-flink-table-" + isStreamingMode); + TableDescriptor tableDescriptor = DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR; + long tableId = createTable(tablePath, tableDescriptor); + RowType rowType = tableDescriptor.getSchema().toRowType(); + + // wait util partitions are created + ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + Map partitionNameByIds = waitUntilPartitions(zooKeeperClient, tablePath); + + // now, write rows to the table + Map> partitionWrittenRows = new HashMap<>(); + for (Map.Entry partitionIdAndName : partitionNameByIds.entrySet()) { + partitionWrittenRows.put( + partitionIdAndName.getKey(), + writeRowsToPartition( + tablePath, + rowType, + Collections.singleton(partitionIdAndName.getValue()))); + } + + // try to write some rows to the table + TestingReaderContext readerContext = new TestingReaderContext(); + List finishedSplit = new ArrayList<>(); + try (final FlinkSourceReader reader = + createReader( + clientConf, + tablePath, + tableDescriptor.getSchema().toRowType(), + readerContext, + finishedSplit)) { + + // first of all, add all splits of all partitions to the reader + Map> assignedBuckets = new HashMap<>(); + for (Long partitionId : partitionNameByIds.keySet()) { + // get the latest offset of each bucket. + Map latestOffsets = + getLatestOffsets( + tablePath, + partitionNameByIds.get(partitionId), + Arrays.asList(0, 1, 2)); + for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) { + TableBucket tableBucket = new TableBucket(tableId, partitionId, i); + reader.addSplits( + Collections.singletonList( + new LogSplit( + tableBucket, + partitionNameByIds.get(partitionId), + 0, + isStreamingMode + ? NO_STOPPING_OFFSET + : latestOffsets.get(i)))); + assignedBuckets + .computeIfAbsent(partitionId, k -> new HashSet<>()) + .add(tableBucket); + } + } + + // first, add no more split to the reader. + reader.notifyNoMoreSplits(); + + List expectRows = new ArrayList<>(); + for (Map.Entry> partitionIdAndWrittenRows : + partitionWrittenRows.entrySet()) { + expectRows.addAll(partitionIdAndWrittenRows.getValue()); + } + + TestingReaderOutput output = new TestingReaderOutput<>(); + + while (output.getEmittedRecords().size() < expectRows.size()) { + reader.pollNext(output); + } + + // get the actual rows, the row format will be +I(x,x,x) + // we need to convert to +I[x, x, x] to match the expected rows format + List actualRows = + output.getEmittedRecords().stream() + .map(Object::toString) + .map(row -> row.replace("(", "[").replace(")", "]").replace(",", ", ")) + .collect(Collectors.toList()); + assertThat(actualRows).containsExactlyInAnyOrderElementsOf(expectRows); + + if (!isStreamingMode) { + InputStatus inputStatus; + do { + inputStatus = reader.pollNext(output); + } while (InputStatus.NOTHING_AVAILABLE == inputStatus); + assertThat(inputStatus).isEqualTo(InputStatus.END_OF_INPUT); + assertThat(finishedSplit.size()) + .isEqualTo(partitionNameByIds.size() * DEFAULT_BUCKET_NUM); + } + } + } + private FlinkSourceReader createReader( Configuration flussConf, TablePath tablePath, RowType sourceOutputType, - SourceReaderContext context) { + SourceReaderContext context, + List finishedSplits) { FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); return new FlinkSourceReader( @@ -168,6 +272,13 @@ private FlinkSourceReader createReader( sourceOutputType, context, null, - new FlinkSourceReaderMetrics(context.metricGroup())); + new FlinkSourceReaderMetrics(context.metricGroup()), + true) { + + @Override + protected void onSplitFinished(Map map) { + finishedSplits.addAll(map.keySet()); + } + }; } } diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java index 355ea1439..7ff96e961 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -46,6 +46,8 @@ import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; import org.apache.flink.table.api.ValidationException; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.ArrayList; @@ -56,6 +58,7 @@ import java.util.Map; import java.util.Set; +import static com.alibaba.fluss.connector.flink.source.split.LogSplit.NO_STOPPING_OFFSET; import static com.alibaba.fluss.connector.flink.source.testutils.RecordAndPosAssert.assertThatRecordAndPos; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; @@ -89,7 +92,8 @@ void testSanityCheck() throws Exception { DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT())), null, - createMockSourceReaderMetrics())) + createMockSourceReaderMetrics(), + true)) .isInstanceOf(ValidationException.class) .hasMessage( "The Flink query schema is not matched to Fluss table schema. \n" @@ -106,7 +110,8 @@ void testSanityCheck() throws Exception { "id", DataTypes.BIGINT().copy(false)), DataTypes.FIELD("name", DataTypes.STRING())), new int[] {1, 0}, - createMockSourceReaderMetrics())) + createMockSourceReaderMetrics(), + true)) .isInstanceOf(ValidationException.class) .hasMessage( "The Flink query schema is not matched to Fluss table schema. \n" @@ -114,12 +119,15 @@ void testSanityCheck() throws Exception { + "Fluss table schema: ROW<`name` STRING, `id` BIGINT NOT NULL> (projection [1, 0])"); } - @Test - void testHandleHybridSnapshotLogSplitChangesAndFetch() throws Exception { - TablePath tablePath = TablePath.of(DEFAULT_DB, "test-only-snapshot-table"); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHandleHybridSnapshotLogSplitChangesAndFetch(boolean isStreamingMode) throws Exception { + TablePath tablePath = + TablePath.of(DEFAULT_DB, "test-only-snapshot-table-" + isStreamingMode); long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); try (FlinkSourceSplitReader splitReader = - createSplitReader(tablePath, DEFAULT_PK_TABLE_SCHEMA.toRowType())) { + createSplitReader( + tablePath, DEFAULT_PK_TABLE_SCHEMA.toRowType(), isStreamingMode)) { // no any records List hybridSnapshotLogSplits = new ArrayList<>(); @@ -136,7 +144,7 @@ void testHandleHybridSnapshotLogSplitChangesAndFetch() throws Exception { // check the expected records waitUntilSnapshot(tableId, 0); - hybridSnapshotLogSplits = getHybridSnapshotLogSplits(tablePath); + hybridSnapshotLogSplits = getHybridSnapshotLogSplits(tablePath, isStreamingMode); expectedRecords = constructRecords(rows); @@ -165,8 +173,9 @@ private Map> constructRecords( return expectedRecords; } - @Test - void testHandleLogSplitChangesAndFetch() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHandleLogSplitChangesAndFetch(boolean isStreamingMode) throws Exception { final Schema schema = Schema.newBuilder() .column("id", DataTypes.INT()) @@ -175,11 +184,11 @@ void testHandleLogSplitChangesAndFetch() throws Exception { final TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).distributedBy(1).build(); - TablePath tablePath1 = TablePath.of(DEFAULT_DB, "test-only-log-table"); + TablePath tablePath1 = TablePath.of(DEFAULT_DB, "test-only-log-table-" + isStreamingMode); long tableId = createTable(tablePath1, tableDescriptor); try (FlinkSourceSplitReader splitReader = - createSplitReader(tablePath1, schema.toRowType())) { + createSplitReader(tablePath1, schema.toRowType(), isStreamingMode)) { // no any records List logSplits = new ArrayList<>(); @@ -200,7 +209,17 @@ void testHandleLogSplitChangesAndFetch() throws Exception { String splitId = toLogSplitId(tableBucket); expectedRecords.put(splitId, expected); - logSplits.add(new LogSplit(tableBucket, null, 0L)); + Map latestOffsets = + getLatestOffsets( + tablePath1, null, Collections.singleton(tableBucket.getBucket())); + logSplits.add( + isStreamingMode + ? new LogSplit(tableBucket, null, 0L) + : new LogSplit( + tableBucket, + null, + 0L, + latestOffsets.get(tableBucket.getBucket()))); assignSplitsAndFetchUntilRetrieveRecords( splitReader, logSplits, expectedRecords, schema.toRowType()); @@ -213,7 +232,7 @@ void testHandleMixSnapshotLogSplitChangesAndFetch() throws Exception { long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); try (FlinkSourceSplitReader splitReader = - createSplitReader(tablePath, DEFAULT_PK_TABLE_SCHEMA.toRowType())) { + createSplitReader(tablePath, DEFAULT_PK_TABLE_SCHEMA.toRowType(), true)) { // now, write some records into the table Map> rows = putRows(tableId, tablePath, 10); @@ -222,7 +241,7 @@ void testHandleMixSnapshotLogSplitChangesAndFetch() throws Exception { waitUntilSnapshot(tableId, 0); List totalSplits = - new ArrayList<>(getHybridSnapshotLogSplits(tablePath)); + new ArrayList<>(getHybridSnapshotLogSplits(tablePath, true)); // construct expected records for snapshot; Map> expectedRecords = constructRecords(rows); @@ -338,9 +357,15 @@ private void assignSplits(FlinkSourceSplitReader splitReader, List getHybridSnapshotLogSplits(TablePath tablePath) throws Exception { + private List getHybridSnapshotLogSplits( + TablePath tablePath, boolean isStreamingMode) throws Exception { KvSnapshotInfo kvSnapshotInfo = admin.getKvSnapshot(tablePath).get(); List hybridSnapshotLogSplits = new ArrayList<>(); BucketsSnapshotInfo bucketsSnapshotInfo = kvSnapshotInfo.getBucketsSnapshots(); + Map latestOffsets = + getLatestOffsets( + tablePath, null, kvSnapshotInfo.getBucketsSnapshots().getBucketIds()); for (Integer bucketId : bucketsSnapshotInfo.getBucketIds()) { TableBucket tableBucket = new TableBucket(kvSnapshotInfo.getTableId(), bucketId); if (bucketsSnapshotInfo.getBucketSnapshotInfo(bucketId).isPresent()) { @@ -414,7 +443,10 @@ private List getHybridSnapshotLogSplits(TablePath tablePath) th tableBucket, null, bucketSnapshotInfo.getSnapshotFiles(), - bucketSnapshotInfo.getLogOffset())); + bucketSnapshotInfo.getLogOffset(), + isStreamingMode + ? NO_STOPPING_OFFSET + : latestOffsets.get(bucketId))); } } return hybridSnapshotLogSplits; diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/split/SourceSplitSerializerTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/split/SourceSplitSerializerTest.java index 4d7e024f3..536f1139d 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/split/SourceSplitSerializerTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/split/SourceSplitSerializerTest.java @@ -61,7 +61,7 @@ void testHybridSnapshotLogSplitSerde(boolean isPartitioned) throws Exception { split = new HybridSnapshotLogSplit( - bucket, partitionName, snapshotFiles, recordsToSkip, true, 5); + bucket, partitionName, snapshotFiles, recordsToSkip, true, 5, 2); serialized = serializer.serialize(split); deserializedSplit = serializer.deserialize(serializer.getVersion(), serialized); assertThat(deserializedSplit).isEqualTo(split); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/testutils/FlinkTestBase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/testutils/FlinkTestBase.java index 559f33bb6..b0ce0ff62 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/testutils/FlinkTestBase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/testutils/FlinkTestBase.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.admin.Admin; +import com.alibaba.fluss.client.admin.OffsetSpec; import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.client.table.writer.AppendWriter; import com.alibaba.fluss.client.table.writer.TableWriter; @@ -26,6 +27,7 @@ import com.alibaba.fluss.config.AutoPartitionTimeUnit; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; @@ -47,6 +49,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; +import javax.annotation.Nullable; + import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -172,7 +176,7 @@ public static void assertResultsIgnoreOrder( throws Exception { int expectRecords = expected.size(); List actual = new ArrayList<>(expectRecords); - for (int i = 0; i < expectRecords; i++) { + for (int i = 0; i < expectRecords && iterator.hasNext(); i++) { actual.add(iterator.next().toString()); } assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); @@ -304,4 +308,15 @@ protected void writeRows(TablePath tablePath, List rows, boolean ap tableWriter.flush(); } } + + protected Map getLatestOffsets( + TablePath tablePath, @Nullable String partitionName, Collection buckets) + throws Exception { + return admin.listOffsets( + PhysicalTablePath.of(tablePath, partitionName), + buckets, + new OffsetSpec.LatestSpec()) + .all() + .get(); + } }