Skip to content

Commit

Permalink
[AMORO-1422][Flink] Support MoR with Flink in SQL/Table API for Mix-f…
Browse files Browse the repository at this point in the history
…ormat table (apache#1952)

* [AMORO-1422][Flink] Support MoR with Flink in SQL/Table API for Mix-format table

* [AMORO-1422][Flink] Support MoR with Flink in SQL/Table API for Mix-format table

* [AMORO-1422][Flink] fixed jingsong's comments.
  • Loading branch information
Xianxun Ye authored Sep 15, 2023
1 parent 9abe9db commit a0d9c59
Show file tree
Hide file tree
Showing 54 changed files with 1,539 additions and 526 deletions.
3 changes: 1 addition & 2 deletions docs/engines/flink/flink-dml.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ The supported parameters for bounded reads of non-primary-key tables in BaseStor

### Batch mode (primary key table)
```sql
-- Read the current full amount and possibly unmerged ChangeStore data
-- TODO: In the future, bounded full amount data will be read using the MOR method.
-- Merge on Read the current mixed-format table and return append-only data.
SELECT * FROM keyed /*+ OPTIONS('streaming'='false', 'scan.startup.mode'='earliest')*/;
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ public Boundedness getBoundedness() {
}

@Override
public SourceReader<T, ArcticSplit> createReader(SourceReaderContext readerContext) throws Exception {
public SourceReader<T, ArcticSplit> createReader(SourceReaderContext readerContext) {
return new ArcticSourceReader<>(readerFunction, readerContext.getConfiguration(), readerContext, dimTable);
}

@Override
public SplitEnumerator<ArcticSplit, ArcticSourceEnumState> createEnumerator(
SplitEnumeratorContext<ArcticSplit> enumContext) throws Exception {
SplitEnumeratorContext<ArcticSplit> enumContext) {
return createEnumerator(enumContext, null);
}

Expand All @@ -112,7 +112,7 @@ private SplitEnumerator<ArcticSplit, ArcticSourceEnumState> createEnumerator(
@Override
public SplitEnumerator<ArcticSplit, ArcticSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<ArcticSplit> enumContext,
ArcticSourceEnumState checkpoint) throws Exception {
ArcticSourceEnumState checkpoint) {
return createEnumerator(enumContext, checkpoint);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import java.util.stream.Collectors;

/**
* According to Mark,Index TreeNodes and subtaskId assigning a split to special subtask to read.
* According to Mark, Index TreeNodes and subtaskId assigning a split to special subtask to read.
*/
public class ShuffleSplitAssigner implements SplitAssigner {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleSplitAssigner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static final class Builder {
private TableSchema projectedSchema;
private List<Expression> filters;
private ReadableConfig flinkConf = new Configuration();
private Map<String, String> properties = new HashMap<>();
private final Map<String, String> properties = new HashMap<>();
private long limit = -1L;
private WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.noWatermarks();
private final ArcticScanContext.Builder contextBuilder = ArcticScanContext.arcticBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.netease.arctic.flink.read.hybrid.assigner.ShuffleSplitAssigner;
import com.netease.arctic.flink.read.hybrid.assigner.SplitAssigner;
import com.netease.arctic.flink.read.hybrid.assigner.StaticSplitAssigner;
import com.netease.arctic.flink.read.hybrid.enumerator.ArcticSourceEnumState;
import com.netease.arctic.flink.read.hybrid.enumerator.ArcticSourceEnumStateSerializer;
import com.netease.arctic.flink.read.hybrid.enumerator.ArcticSourceEnumerator;
Expand All @@ -39,8 +40,6 @@
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Arctic Source based of Flip27.
Expand All @@ -51,8 +50,6 @@
*/
public class ArcticSource<T> implements Source<T, ArcticSplit, ArcticSourceEnumState>, ResultTypeQueryable<T> {
private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(ArcticSource.class);
private final ArcticScanContext scanContext;
private final ReaderFunction<T> readerFunction;
private final TypeInformation<T> typeInformation;
Expand Down Expand Up @@ -80,39 +77,32 @@ public Boundedness getBoundedness() {
}

@Override
public SourceReader<T, ArcticSplit> createReader(SourceReaderContext readerContext) throws Exception {
public SourceReader<T, ArcticSplit> createReader(SourceReaderContext readerContext) {
return new ArcticSourceReader<>(readerFunction, readerContext.getConfiguration(), readerContext, dimTable);
}

@Override
public SplitEnumerator<ArcticSplit, ArcticSourceEnumState> createEnumerator(
SplitEnumeratorContext<ArcticSplit> enumContext) throws Exception {
SplitEnumeratorContext<ArcticSplit> enumContext) {
return createEnumerator(enumContext, null);
}

private SplitEnumerator<ArcticSplit, ArcticSourceEnumState> createEnumerator(
SplitEnumeratorContext<ArcticSplit> enumContext, ArcticSourceEnumState enumState) {
SplitAssigner splitAssigner;
if (enumState == null) {
splitAssigner = new ShuffleSplitAssigner(enumContext);
} else {
LOG.info("Arctic source restored {} splits from state for table {}",
enumState.pendingSplits().size(), tableName);
splitAssigner = new ShuffleSplitAssigner(enumContext, enumState.pendingSplits(),
enumState.shuffleSplitRelation());
}

if (scanContext.isStreaming()) {
splitAssigner = new ShuffleSplitAssigner(enumContext, tableName, enumState);
return new ArcticSourceEnumerator(enumContext, splitAssigner, loader, scanContext, enumState, dimTable);
} else {
splitAssigner = new StaticSplitAssigner(enumState);
return new StaticArcticSourceEnumerator(enumContext, splitAssigner, loader, scanContext, null);
}
}

@Override
public SplitEnumerator<ArcticSplit, ArcticSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<ArcticSplit> enumContext,
ArcticSourceEnumState checkpoint) throws Exception {
ArcticSourceEnumState checkpoint) {
return createEnumerator(enumContext, checkpoint);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@

import com.netease.arctic.data.DataTreeNode;
import com.netease.arctic.data.PrimaryKeyedFile;
import com.netease.arctic.flink.read.hybrid.enumerator.ArcticSourceEnumState;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplitState;
import com.netease.arctic.scan.ArcticFileScanTask;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -43,7 +47,7 @@
import java.util.stream.Collectors;

/**
* According to Mark,Index TreeNodes and subtaskId assigning a split to special subtask to read.
* According to Mark, Index TreeNodes and subtaskId assigning a split to special subtask to read.
*/
public class ShuffleSplitAssigner implements SplitAssigner {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleSplitAssigner.class);
Expand All @@ -68,6 +72,7 @@ public class ShuffleSplitAssigner implements SplitAssigner {
private CompletableFuture<Void> availableFuture;


@VisibleForTesting
public ShuffleSplitAssigner(
SplitEnumeratorContext<ArcticSplit> enumeratorContext) {
this.enumeratorContext = enumeratorContext;
Expand All @@ -77,15 +82,29 @@ public ShuffleSplitAssigner(
}

public ShuffleSplitAssigner(
SplitEnumeratorContext<ArcticSplit> enumeratorContext, Collection<ArcticSplitState> splitStates,
long[] shuffleSplitRelation) {
SplitEnumeratorContext<ArcticSplit> enumeratorContext,
String tableName,
@Nullable ArcticSourceEnumState enumState) {
this.enumeratorContext = enumeratorContext;
this.partitionIndexSubtaskMap = new ConcurrentHashMap<>();
this.subtaskSplitMap = new ConcurrentHashMap<>();
deserializePartitionIndex(shuffleSplitRelation);
splitStates.forEach(state -> onDiscoveredSplits(Collections.singleton(state.toSourceSplit())));
if (enumState == null) {
this.totalParallelism = enumeratorContext.currentParallelism();
LOG.info(
"Arctic source enumerator current parallelism is {} for table {}",
totalParallelism, tableName);
} else {
LOG.info("Arctic source restored {} splits from state for table {}",
enumState.pendingSplits().size(), tableName);
deserializePartitionIndex(
Objects.requireNonNull(
enumState.shuffleSplitRelation(),
"The partition index and subtask state couldn't be null."));
enumState.pendingSplits().forEach(state -> onDiscoveredSplits(Collections.singleton(state.toSourceSplit())));
}
}


@Override
public Split getNext() {
throw new UnsupportedOperationException("ShuffleSplitAssigner couldn't support this operation.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.flink.read.hybrid.assigner;

import com.netease.arctic.flink.read.hybrid.enumerator.ArcticSourceEnumState;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplitState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* This is a static split assigner which is used for batch mode.
*/
public class StaticSplitAssigner implements SplitAssigner {
private static final Logger LOG = LoggerFactory.getLogger(StaticSplitAssigner.class);

private static final long POLL_TIMEOUT = 200;
private int totalSplitNum;

private final PriorityBlockingQueue<ArcticSplit> splitQueue;

private CompletableFuture<Void> availableFuture;

public StaticSplitAssigner(@Nullable ArcticSourceEnumState enumState) {
this.splitQueue = new PriorityBlockingQueue<>();
if (enumState != null) {
Collection<ArcticSplitState> splitStates = enumState.pendingSplits();
splitStates.forEach(state -> onDiscoveredSplits(Collections.singleton(state.toSourceSplit())));
}
}

@Override
public Split getNext() {
return getNextSplit().map(Split::of).orElseGet(Split::unavailable);
}

@Override
public Split getNext(int subtaskId) {
return getNext();
}

private Optional<ArcticSplit> getNextSplit() {
ArcticSplit arcticSplit = null;
try {
arcticSplit = splitQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Interrupted when polling splits from the split queue", e);
}
if (arcticSplit == null) {
LOG.debug("Couldn't retrieve arctic source split from the queue, as the queue is empty.");
return Optional.empty();
} else {
LOG.info("Assigning the arctic split, task index is {}, total number of splits is {}, arctic split is {}.",
arcticSplit.taskIndex(), totalSplitNum, arcticSplit);
return Optional.of(arcticSplit);
}
}

@Override
public void onDiscoveredSplits(Collection<ArcticSplit> splits) {
splits.forEach(this::putArcticIntoQueue);
totalSplitNum += splits.size();
// only complete pending future if new splits are discovered
completeAvailableFuturesIfNeeded();
}

@Override
public void onUnassignedSplits(Collection<ArcticSplit> splits) {
onDiscoveredSplits(splits);
}

void putArcticIntoQueue(final ArcticSplit split) {
splitQueue.put(split);
}

@Override
public Collection<ArcticSplitState> state() {
return splitQueue.stream().map(ArcticSplitState::new).collect(Collectors.toList());
}

@Override
public synchronized CompletableFuture<Void> isAvailable() {
if (availableFuture == null) {
availableFuture = new CompletableFuture<>();
}
return availableFuture;
}

public boolean isEmpty() {
return splitQueue.isEmpty();
}

@Override
public void close() throws IOException {
splitQueue.clear();
}

private synchronized void completeAvailableFuturesIfNeeded() {
if (availableFuture != null && !isEmpty()) {
availableFuture.complete(null);
}
availableFuture = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public ContinuousSplitPlannerImpl(ArcticTableLoader loader) {

@Override
public void close() throws IOException {
if (loader != null) {
loader.close();
}
}

@Override
Expand Down Expand Up @@ -109,6 +112,9 @@ protected ContinuousEnumerationResult discoverIncrementalSplits(

protected ContinuousEnumerationResult discoverInitialSplits(List<Expression> filters) {
Snapshot changeSnapshot = table.changeTable().currentSnapshot();
// todo ShuffleSplitAssigner doesn't support MergeOnReadSplit right now,
// because it doesn't implement the dataTreeNode() method
// fix AMORO-1950 in the future.
List<ArcticSplit> arcticSplits = FlinkSplitPlanner.planFullTable(table, filters, splitCount);

long changeStartSnapshotId = changeSnapshot != null ? changeSnapshot.snapshotId() : EARLIEST_SNAPSHOT_ID;
Expand Down
Loading

0 comments on commit a0d9c59

Please sign in to comment.