Skip to content

Commit

Permalink
[Feature] Support to trigger flush according to the number of rows
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Aug 14, 2023
1 parent 3a33bbf commit f6337f6
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class StreamLoadTableProperties implements Serializable {

private final boolean enableUpsertDelete;
private final long chunkLimit;
private final int maxBufferRows;

private StreamLoadTableProperties(Builder builder) {
this.database = builder.database;
Expand All @@ -56,6 +57,7 @@ private StreamLoadTableProperties(Builder builder) {
} else {
chunkLimit = Math.min(10737418240L, builder.chunkLimit);
}
this.maxBufferRows = builder.maxBufferRows;
this.properties = builder.properties;
}

Expand Down Expand Up @@ -83,6 +85,10 @@ public Long getChunkLimit() {
return chunkLimit;
}

public int getMaxBufferRows() {
return maxBufferRows;
}

public Map<String, String> getProperties() {
return properties;
}
Expand All @@ -100,6 +106,7 @@ public static class Builder {
private boolean enableUpsertDelete;
private StreamLoadDataFormat dataFormat;
private long chunkLimit;
private int maxBufferRows = Integer.MAX_VALUE;

private final Map<String, String> properties = new HashMap<>();

Expand Down Expand Up @@ -142,6 +149,11 @@ public Builder chunkLimit(long chunkLimit) {
return this;
}

public Builder maxBufferRows(int maxBufferRows) {
this.maxBufferRows = maxBufferRows;
return this;
}

public Builder addProperties(Map<String, String> properties) {
this.properties.putAll(properties);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class FlushAndCommitStrategy implements StreamLoadStrategy {
private final boolean enableAutoCommit;

private final AtomicLong numAgeTriggerFlush = new AtomicLong(0);
private final AtomicLong numForceTriggerFlush = new AtomicLong(0);
private final AtomicLong numCacheTriggerFlush = new AtomicLong(0);
private final AtomicLong numTableTriggerFlush = new AtomicLong(0);

public FlushAndCommitStrategy(StreamLoadProperties properties, boolean enableAutoCommit) {
this.expectDelayTime = properties.getExpectDelayTime();
Expand All @@ -60,24 +61,34 @@ public List<TableRegion> select(Iterable<TableRegion> regions) {
throw new UnsupportedOperationException();
}

public List<TableRegion> selectFlushRegions(Queue<TableRegion> regions, long currentCacheBytes) {
List<TableRegion> flushRegions = new ArrayList<>();
for (TableRegion region : regions) {
public List<SelectFlushResult> selectFlushRegions(Queue<TransactionTableRegion> regions, long currentCacheBytes) {
List<SelectFlushResult> flushRegions = new ArrayList<>();
for (TransactionTableRegion region : regions) {
if (shouldCommit(region)) {
numAgeTriggerFlush.getAndIncrement();
flushRegions.add(region);
flushRegions.add(new SelectFlushResult(FlushReason.COMMIT, region));
LOG.debug("Choose region {} to flush because the region should commit, age: {}, " +
"threshold: {}, scanFreq: {}, expectDelayTime: {}", region.getUniqueKey(),
region.getAge(), ageThreshold, scanFrequency, expectDelayTime);
} else {
FlushReason reason = region.shouldFlush();
if (reason != FlushReason.NONE) {
numTableTriggerFlush.getAndIncrement();
flushRegions.add(new SelectFlushResult(reason, region));
LOG.debug("Choose region {} to flush because the region itself decide to flush, age: {}, " +
"threshold: {}, scanFreq: {}, expectDelayTime: {}, reason: {}", region.getUniqueKey(),
region.getAge(), ageThreshold, scanFrequency, expectDelayTime, reason);
}
}
}

// simply choose the region with maximum bytes
if (flushRegions.isEmpty() && currentCacheBytes >= maxCacheBytes) {
regions.stream().max(Comparator.comparingLong(TableRegion::getCacheBytes)).ifPresent(flushRegions::add);
if (!flushRegions.isEmpty()) {
numForceTriggerFlush.getAndIncrement();
TableRegion region = flushRegions.get(0);
TransactionTableRegion region = regions.stream()
.max(Comparator.comparingLong(TableRegion::getCacheBytes)).orElse(null);
if (region != null) {
numCacheTriggerFlush.getAndIncrement();
flushRegions.add(new SelectFlushResult(FlushReason.CACHE_FULL, region));
LOG.debug("Choose region {} to flush because it's force flush, age: {}, " +
"threshold: {}, scanFreq: {}, expectDelayTime: {}", region.getUniqueKey(),
region.getAge(), ageThreshold, scanFrequency, expectDelayTime);
Expand All @@ -100,7 +111,27 @@ public String toString() {
", maxCacheBytes=" + maxCacheBytes +
", enableAutoCommit=" + enableAutoCommit +
", numAgeTriggerFlush=" + numAgeTriggerFlush +
", numForceTriggerFlush=" + numForceTriggerFlush +
", numCacheTriggerFlush=" + numCacheTriggerFlush +
", numTableTriggerFlush=" + numTableTriggerFlush +
'}';
}

public static class SelectFlushResult {

private final FlushReason reason;
private TransactionTableRegion region;

public SelectFlushResult(FlushReason reason, TransactionTableRegion region) {
this.reason = reason;
this.region = region;
}

public FlushReason getReason() {
return reason;
}

public TransactionTableRegion getRegion() {
return region;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream.v2;

/**
* Reason to trigger flush for a table.
*/
public enum FlushReason {
// No need to flush
NONE,
// Should commit the data
COMMIT,
// Cache is full, and need flush on or more tables
CACHE_FULL,
// The number of buffered rows reaches the limit
BUFFER_ROWS_REACH_LIMIT
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ enum State {
private final AtomicReference<State> state = new AtomicReference<>(State.INACTIVE);
private volatile Throwable e;

private final Queue<TableRegion> flushQ = new LinkedList<>();
private final Queue<TransactionTableRegion> flushQ = new LinkedList<>();

/**
* Whether write() has triggered a flush after currentCacheBytes > maxCacheBytes.
Expand Down Expand Up @@ -167,20 +167,20 @@ public void init() {
}

if (savepoint) {
for (TableRegion region : flushQ) {
boolean flush = region.flush();
for (TransactionTableRegion region : flushQ) {
boolean flush = region.flush(FlushReason.COMMIT);
LOG.debug("Trigger flush table region {} because of savepoint, region cache bytes: {}, flush: {}",
region.getUniqueKey(), region.getCacheBytes(), flush);
}

// should ensure all data is committed for auto-commit mode
if (enableAutoCommit) {
int committedRegions = 0;
for (TableRegion region : flushQ) {
for (TransactionTableRegion region : flushQ) {
// savepoint makes sure no more data is written, so these conditions
// can guarantee commit after all data has been written to StarRocks
if (region.getCacheBytes() == 0 && !region.isFlushing()) {
boolean success = ((TransactionTableRegion) region).commit();
boolean success = region.commit();
if (success) {
committedRegions += 1;
region.resetAge();
Expand All @@ -196,19 +196,20 @@ public void init() {
}
LockSupport.unpark(current);
} else {
for (TableRegion region : flushQ) {
for (TransactionTableRegion region : flushQ) {
region.getAndIncrementAge();
if (flushAndCommitStrategy.shouldCommit(region)) {
boolean success = ((TransactionTableRegion) region).commit();
boolean success = region.commit();
if (success) {
region.resetAge();
}
LOG.debug("Commit region {} for normal, success: {}", region.getUniqueKey(), success);
}
}

for (TableRegion region : flushAndCommitStrategy.selectFlushRegions(flushQ, currentCacheBytes.get())) {
boolean flush = region.flush();
for (FlushAndCommitStrategy.SelectFlushResult result : flushAndCommitStrategy.selectFlushRegions(flushQ, currentCacheBytes.get())) {
TransactionTableRegion region = result.getRegion();
boolean flush = region.flush(result.getReason());
LOG.debug("Trigger flush table region {} because of selection, region cache bytes: {}," +
" flush: {}", region.getUniqueKey(), region.getCacheBytes(), flush);
}
Expand Down Expand Up @@ -425,7 +426,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t
region = new TransactionTableRegion(uniqueKey, database, table, this,
tableProperties, streamLoader, maxRetries, retryIntervalInMs);
regions.put(uniqueKey, region);
flushQ.offer(region);
flushQ.offer((TransactionTableRegion) region);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import com.starrocks.data.load.stream.Chunk;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
Expand Down Expand Up @@ -55,6 +55,7 @@ enum State {
private final StreamLoadTableProperties properties;
private final AtomicLong age = new AtomicLong(0L);
private final AtomicLong cacheBytes = new AtomicLong();
private final AtomicLong cacheRows = new AtomicLong();
private final AtomicReference<State> state;
private final AtomicBoolean ctl = new AtomicBoolean(false);
private volatile Chunk activeChunk;
Expand Down Expand Up @@ -160,20 +161,22 @@ public int write(byte[] row) {
}

private void switchChunk() {
if (activeChunk == null) {
if (activeChunk == null || activeChunk.numRows() == 0) {
return;
}
inactiveChunks.add(activeChunk);
activeChunk = new Chunk(properties.getDataFormat());
}

protected int write0(byte[] row) {
if (activeChunk.estimateChunkSize(row) > properties.getChunkLimit()) {
if (activeChunk.estimateChunkSize(row) > properties.getChunkLimit()
|| activeChunk.numRows() >= properties.getMaxBufferRows()) {
switchChunk();
}

activeChunk.addRow(row);
cacheBytes.addAndGet(row.length);
cacheRows.incrementAndGet();
return row.length;
}

Expand All @@ -182,13 +185,22 @@ public boolean isFlushing() {
return state.get() == State.FLUSHING;
}

@Override
public boolean flush() {
// Whether to should flush
public FlushReason shouldFlush() {
if (state.get() != State.ACTIVE) {
return FlushReason.NONE;
}
return cacheRows.get() >= properties.getMaxBufferRows() ? FlushReason.BUFFER_ROWS_REACH_LIMIT : FlushReason.NONE;
}

public boolean flush(FlushReason reason) {
if (state.compareAndSet(State.ACTIVE, State.FLUSHING)) {
for (;;) {
if (ctl.compareAndSet(false, true)) {
LOG.info("Flush uniqueKey : {}, label : {}, bytes : {}", uniqueKey, label, cacheBytes.get());
if (activeChunk.numRows() > 0) {
LOG.info("Flush uniqueKey : {}, label : {}, bytes : {}, rows: {}, reason: {}",
uniqueKey, label, cacheBytes.get(), cacheRows.get(), reason);
if (reason != FlushReason.BUFFER_ROWS_REACH_LIMIT ||
activeChunk.numRows() >= properties.getMaxBufferRows()) {
switchChunk();
}
ctl.set(false);
Expand Down Expand Up @@ -264,6 +276,7 @@ public void fail(Throwable e) {
public void complete(StreamLoadResponse response) {
Chunk chunk = inactiveChunks.remove();
cacheBytes.addAndGet(-chunk.rowBytes());
cacheRows.addAndGet(-chunk.numRows());
response.setFlushBytes(chunk.rowBytes());
response.setFlushRows(chunk.numRows());
manager.callback(response);
Expand Down Expand Up @@ -350,4 +363,7 @@ public boolean cancel() {
public boolean isReadable() {
throw new UnsupportedOperationException();
}

@Override
public boolean flush() { throw new UnsupportedOperationException(); }
}

0 comments on commit f6337f6

Please sign in to comment.