From f6337f6df10bac49255d5ae28a0411c16541ff7f Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Mon, 14 Aug 2023 20:09:49 +0800 Subject: [PATCH] [Feature] Support to trigger flush according to the number of rows Signed-off-by: PengFei Li --- .../properties/StreamLoadTableProperties.java | 12 +++++ .../stream/v2/FlushAndCommitStrategy.java | 51 +++++++++++++++---- .../data/load/stream/v2/FlushReason.java | 35 +++++++++++++ .../load/stream/v2/StreamLoadManagerV2.java | 21 ++++---- .../stream/v2/TransactionTableRegion.java | 30 ++++++++--- 5 files changed, 122 insertions(+), 27 deletions(-) create mode 100644 starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java index 2b8194ca..4c48f8e8 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java @@ -37,6 +37,7 @@ public class StreamLoadTableProperties implements Serializable { private final boolean enableUpsertDelete; private final long chunkLimit; + private final int maxBufferRows; private StreamLoadTableProperties(Builder builder) { this.database = builder.database; @@ -56,6 +57,7 @@ private StreamLoadTableProperties(Builder builder) { } else { chunkLimit = Math.min(10737418240L, builder.chunkLimit); } + this.maxBufferRows = builder.maxBufferRows; this.properties = builder.properties; } @@ -83,6 +85,10 @@ public Long getChunkLimit() { return chunkLimit; } + public int getMaxBufferRows() { + return maxBufferRows; + } + public Map getProperties() { return properties; } @@ -100,6 +106,7 @@ public static class Builder { private boolean enableUpsertDelete; private StreamLoadDataFormat dataFormat; private long chunkLimit; + private int maxBufferRows = Integer.MAX_VALUE; private final Map properties = new HashMap<>(); @@ -142,6 +149,11 @@ public Builder chunkLimit(long chunkLimit) { return this; } + public Builder maxBufferRows(int maxBufferRows) { + this.maxBufferRows = maxBufferRows; + return this; + } + public Builder addProperties(Map properties) { this.properties.putAll(properties); return this; diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushAndCommitStrategy.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushAndCommitStrategy.java index faf934e2..6b151195 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushAndCommitStrategy.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushAndCommitStrategy.java @@ -43,7 +43,8 @@ public class FlushAndCommitStrategy implements StreamLoadStrategy { private final boolean enableAutoCommit; private final AtomicLong numAgeTriggerFlush = new AtomicLong(0); - private final AtomicLong numForceTriggerFlush = new AtomicLong(0); + private final AtomicLong numCacheTriggerFlush = new AtomicLong(0); + private final AtomicLong numTableTriggerFlush = new AtomicLong(0); public FlushAndCommitStrategy(StreamLoadProperties properties, boolean enableAutoCommit) { this.expectDelayTime = properties.getExpectDelayTime(); @@ -60,24 +61,34 @@ public List select(Iterable regions) { throw new UnsupportedOperationException(); } - public List selectFlushRegions(Queue regions, long currentCacheBytes) { - List flushRegions = new ArrayList<>(); - for (TableRegion region : regions) { + public List selectFlushRegions(Queue regions, long currentCacheBytes) { + List flushRegions = new ArrayList<>(); + for (TransactionTableRegion region : regions) { if (shouldCommit(region)) { numAgeTriggerFlush.getAndIncrement(); - flushRegions.add(region); + flushRegions.add(new SelectFlushResult(FlushReason.COMMIT, region)); LOG.debug("Choose region {} to flush because the region should commit, age: {}, " + "threshold: {}, scanFreq: {}, expectDelayTime: {}", region.getUniqueKey(), region.getAge(), ageThreshold, scanFrequency, expectDelayTime); + } else { + FlushReason reason = region.shouldFlush(); + if (reason != FlushReason.NONE) { + numTableTriggerFlush.getAndIncrement(); + flushRegions.add(new SelectFlushResult(reason, region)); + LOG.debug("Choose region {} to flush because the region itself decide to flush, age: {}, " + + "threshold: {}, scanFreq: {}, expectDelayTime: {}, reason: {}", region.getUniqueKey(), + region.getAge(), ageThreshold, scanFrequency, expectDelayTime, reason); + } } } // simply choose the region with maximum bytes if (flushRegions.isEmpty() && currentCacheBytes >= maxCacheBytes) { - regions.stream().max(Comparator.comparingLong(TableRegion::getCacheBytes)).ifPresent(flushRegions::add); - if (!flushRegions.isEmpty()) { - numForceTriggerFlush.getAndIncrement(); - TableRegion region = flushRegions.get(0); + TransactionTableRegion region = regions.stream() + .max(Comparator.comparingLong(TableRegion::getCacheBytes)).orElse(null); + if (region != null) { + numCacheTriggerFlush.getAndIncrement(); + flushRegions.add(new SelectFlushResult(FlushReason.CACHE_FULL, region)); LOG.debug("Choose region {} to flush because it's force flush, age: {}, " + "threshold: {}, scanFreq: {}, expectDelayTime: {}", region.getUniqueKey(), region.getAge(), ageThreshold, scanFrequency, expectDelayTime); @@ -100,7 +111,27 @@ public String toString() { ", maxCacheBytes=" + maxCacheBytes + ", enableAutoCommit=" + enableAutoCommit + ", numAgeTriggerFlush=" + numAgeTriggerFlush + - ", numForceTriggerFlush=" + numForceTriggerFlush + + ", numCacheTriggerFlush=" + numCacheTriggerFlush + + ", numTableTriggerFlush=" + numTableTriggerFlush + '}'; } + + public static class SelectFlushResult { + + private final FlushReason reason; + private TransactionTableRegion region; + + public SelectFlushResult(FlushReason reason, TransactionTableRegion region) { + this.reason = reason; + this.region = region; + } + + public FlushReason getReason() { + return reason; + } + + public TransactionTableRegion getRegion() { + return region; + } + } } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java new file mode 100644 index 00000000..96d2a6d0 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.v2; + +/** + * Reason to trigger flush for a table. + */ +public enum FlushReason { + // No need to flush + NONE, + // Should commit the data + COMMIT, + // Cache is full, and need flush on or more tables + CACHE_FULL, + // The number of buffered rows reaches the limit + BUFFER_ROWS_REACH_LIMIT +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java index edb43dbc..43094330 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java @@ -105,7 +105,7 @@ enum State { private final AtomicReference state = new AtomicReference<>(State.INACTIVE); private volatile Throwable e; - private final Queue flushQ = new LinkedList<>(); + private final Queue flushQ = new LinkedList<>(); /** * Whether write() has triggered a flush after currentCacheBytes > maxCacheBytes. @@ -167,8 +167,8 @@ public void init() { } if (savepoint) { - for (TableRegion region : flushQ) { - boolean flush = region.flush(); + for (TransactionTableRegion region : flushQ) { + boolean flush = region.flush(FlushReason.COMMIT); LOG.debug("Trigger flush table region {} because of savepoint, region cache bytes: {}, flush: {}", region.getUniqueKey(), region.getCacheBytes(), flush); } @@ -176,11 +176,11 @@ public void init() { // should ensure all data is committed for auto-commit mode if (enableAutoCommit) { int committedRegions = 0; - for (TableRegion region : flushQ) { + for (TransactionTableRegion region : flushQ) { // savepoint makes sure no more data is written, so these conditions // can guarantee commit after all data has been written to StarRocks if (region.getCacheBytes() == 0 && !region.isFlushing()) { - boolean success = ((TransactionTableRegion) region).commit(); + boolean success = region.commit(); if (success) { committedRegions += 1; region.resetAge(); @@ -196,10 +196,10 @@ public void init() { } LockSupport.unpark(current); } else { - for (TableRegion region : flushQ) { + for (TransactionTableRegion region : flushQ) { region.getAndIncrementAge(); if (flushAndCommitStrategy.shouldCommit(region)) { - boolean success = ((TransactionTableRegion) region).commit(); + boolean success = region.commit(); if (success) { region.resetAge(); } @@ -207,8 +207,9 @@ public void init() { } } - for (TableRegion region : flushAndCommitStrategy.selectFlushRegions(flushQ, currentCacheBytes.get())) { - boolean flush = region.flush(); + for (FlushAndCommitStrategy.SelectFlushResult result : flushAndCommitStrategy.selectFlushRegions(flushQ, currentCacheBytes.get())) { + TransactionTableRegion region = result.getRegion(); + boolean flush = region.flush(result.getReason()); LOG.debug("Trigger flush table region {} because of selection, region cache bytes: {}," + " flush: {}", region.getUniqueKey(), region.getCacheBytes(), flush); } @@ -425,7 +426,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t region = new TransactionTableRegion(uniqueKey, database, table, this, tableProperties, streamLoader, maxRetries, retryIntervalInMs); regions.put(uniqueKey, region); - flushQ.offer(region); + flushQ.offer((TransactionTableRegion) region); } } } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index 994777d7..0e5e73cd 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -20,9 +20,9 @@ import com.starrocks.data.load.stream.Chunk; import com.starrocks.data.load.stream.StreamLoadManager; +import com.starrocks.data.load.stream.StreamLoader; import com.starrocks.data.load.stream.StreamLoadResponse; import com.starrocks.data.load.stream.StreamLoadSnapshot; -import com.starrocks.data.load.stream.StreamLoader; import com.starrocks.data.load.stream.TableRegion; import com.starrocks.data.load.stream.exception.StreamLoadFailException; import com.starrocks.data.load.stream.http.StreamLoadEntityMeta; @@ -55,6 +55,7 @@ enum State { private final StreamLoadTableProperties properties; private final AtomicLong age = new AtomicLong(0L); private final AtomicLong cacheBytes = new AtomicLong(); + private final AtomicLong cacheRows = new AtomicLong(); private final AtomicReference state; private final AtomicBoolean ctl = new AtomicBoolean(false); private volatile Chunk activeChunk; @@ -160,7 +161,7 @@ public int write(byte[] row) { } private void switchChunk() { - if (activeChunk == null) { + if (activeChunk == null || activeChunk.numRows() == 0) { return; } inactiveChunks.add(activeChunk); @@ -168,12 +169,14 @@ private void switchChunk() { } protected int write0(byte[] row) { - if (activeChunk.estimateChunkSize(row) > properties.getChunkLimit()) { + if (activeChunk.estimateChunkSize(row) > properties.getChunkLimit() + || activeChunk.numRows() >= properties.getMaxBufferRows()) { switchChunk(); } activeChunk.addRow(row); cacheBytes.addAndGet(row.length); + cacheRows.incrementAndGet(); return row.length; } @@ -182,13 +185,22 @@ public boolean isFlushing() { return state.get() == State.FLUSHING; } - @Override - public boolean flush() { + // Whether to should flush + public FlushReason shouldFlush() { + if (state.get() != State.ACTIVE) { + return FlushReason.NONE; + } + return cacheRows.get() >= properties.getMaxBufferRows() ? FlushReason.BUFFER_ROWS_REACH_LIMIT : FlushReason.NONE; + } + + public boolean flush(FlushReason reason) { if (state.compareAndSet(State.ACTIVE, State.FLUSHING)) { for (;;) { if (ctl.compareAndSet(false, true)) { - LOG.info("Flush uniqueKey : {}, label : {}, bytes : {}", uniqueKey, label, cacheBytes.get()); - if (activeChunk.numRows() > 0) { + LOG.info("Flush uniqueKey : {}, label : {}, bytes : {}, rows: {}, reason: {}", + uniqueKey, label, cacheBytes.get(), cacheRows.get(), reason); + if (reason != FlushReason.BUFFER_ROWS_REACH_LIMIT || + activeChunk.numRows() >= properties.getMaxBufferRows()) { switchChunk(); } ctl.set(false); @@ -264,6 +276,7 @@ public void fail(Throwable e) { public void complete(StreamLoadResponse response) { Chunk chunk = inactiveChunks.remove(); cacheBytes.addAndGet(-chunk.rowBytes()); + cacheRows.addAndGet(-chunk.numRows()); response.setFlushBytes(chunk.rowBytes()); response.setFlushRows(chunk.numRows()); manager.callback(response); @@ -350,4 +363,7 @@ public boolean cancel() { public boolean isReadable() { throw new UnsupportedOperationException(); } + + @Override + public boolean flush() { throw new UnsupportedOperationException(); } }