From eb5f205719981f40eb66b44547a06ffa2da804d1 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Mon, 29 Apr 2024 14:38:31 +0800 Subject: [PATCH] [Enhancement] Support to set warehouse when using transaction stream load (#361) Signed-off-by: PengFei Li --- .../datastream/WriteMultipleTables.java | 4 ++-- .../load/stream/TransactionStreamLoader.java | 4 ++++ .../properties/StreamLoadProperties.java | 8 +++---- .../data/load/stream/v2/FlushReason.java | 7 ++++-- .../load/stream/v2/StreamLoadManagerV2.java | 17 +++++++------- .../stream/v2/TransactionTableRegion.java | 23 +++++++++++++------ 6 files changed, 40 insertions(+), 23 deletions(-) diff --git a/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java b/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java index d9322a56..0aa52844 100644 --- a/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java +++ b/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java @@ -71,8 +71,8 @@ public static void main(String[] args) throws Exception { // // 2. replace the connector options with your cluster configurations MultipleParameterTool params = MultipleParameterTool.fromArgs(args); - String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:11903"); - String loadUrl = params.get("loadUrl", "127.0.0.1:11901"); + String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:9030"); + String loadUrl = params.get("loadUrl", "127.0.0.1:8030"); String userName = params.get("userName", "root"); String password = params.get("password", ""); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java index 9731249f..c927f715 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java @@ -72,6 +72,10 @@ protected void initTxHeaders(StreamLoadProperties properties) { } else { beginHeaders.put("timeout", timeout); } + String warehouse = properties.getHeaders().get("warehouse"); + if (warehouse != null) { + beginHeaders.put("warehouse", warehouse); + } this.beginTxnHeader = beginHeaders.entrySet().stream() .map(entry -> new BasicHeader(entry.getKey(), entry.getValue())) .toArray(Header[]::new); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java index 118a168c..e09acd93 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java @@ -356,17 +356,17 @@ public Builder expectDelayTime(long expectDelayTime) { } public Builder connectTimeout(int connectTimeout) { - if (connectTimeout < 100 || connectTimeout > 60000) { - throw new IllegalArgumentException("connectTimeout `" + connectTimeout + "ms` set failed, must range in [100, 60000]"); + if (connectTimeout < 100) { + throw new IllegalArgumentException("connectTimeout `" + connectTimeout + "ms` set failed, must be larger than 100ms"); } this.connectTimeout = connectTimeout; return this; } public Builder waitForContinueTimeoutMs(int waitForContinueTimeoutMs) { - if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE || waitForContinueTimeoutMs > 60000) { + if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE) { throw new IllegalArgumentException("waitForContinueTimeoutMs `" + waitForContinueTimeoutMs + - "ms` set failed, must be in range in [3000, 60000]"); + "ms` set failed, must be be larger than 3000ms"); } this.waitForContinueTimeoutMs = waitForContinueTimeoutMs; return this; diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java index 96d2a6d0..100bef2d 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java @@ -26,10 +26,13 @@ public enum FlushReason { // No need to flush NONE, - // Should commit the data + // Trigger the commit condition, such as flush interval, + // and should flush first COMMIT, // Cache is full, and need flush on or more tables CACHE_FULL, // The number of buffered rows reaches the limit - BUFFER_ROWS_REACH_LIMIT + BUFFER_ROWS_REACH_LIMIT, + // Force flush, such as StreamLoadManagerV2.flush + FORCE } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java index 2dd2f982..cc2c13e4 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java @@ -176,7 +176,7 @@ public void init() { if (savepoint) { for (TransactionTableRegion region : flushQ) { - boolean flush = region.flush(FlushReason.COMMIT); + boolean flush = region.flush(FlushReason.FORCE); LOG.debug("Trigger flush table region {} because of savepoint, region cache bytes: {}, flush: {}", region.getUniqueKey(), region.getCacheBytes(), flush); } @@ -187,19 +187,20 @@ public void init() { for (TransactionTableRegion region : flushQ) { // savepoint makes sure no more data is written, so these conditions // can guarantee commit after all data has been written to StarRocks - if (region.getCacheBytes() == 0 && !region.isFlushing()) { - boolean success = region.commit(); - if (success) { - committedRegions += 1; - region.resetAge(); - } - LOG.debug("Commit region {} for savepoint, success: {}", region.getUniqueKey(), success); + boolean success = region.commit(); + if (success && region.getCacheBytes() == 0) { + committedRegions += 1; + region.resetAge(); } + LOG.debug("Commit region {} for savepoint, success: {}", region.getUniqueKey(), success); } if (committedRegions == flushQ.size()) { allRegionsCommitted = true; LOG.info("All regions committed for savepoint, number of regions: {}", committedRegions); + } else { + LOG.debug("Some regions not committed for savepoint, expected num: {}, actual num: {}", + flushQ.size(), committedRegions); } } LockSupport.unpark(current); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index 26bfb457..c1f410c4 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -219,11 +219,11 @@ public FlushReason shouldFlush() { } public boolean flush(FlushReason reason) { + LOG.debug("Try to flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}", + database, table, label, cacheBytes, cacheRows, reason); if (state.compareAndSet(State.ACTIVE, State.FLUSHING)) { for (;;) { if (ctl.compareAndSet(false, true)) { - LOG.info("Flush uniqueKey : {}, label : {}, bytes : {}, rows: {}, reason: {}", - uniqueKey, label, cacheBytes.get(), cacheRows.get(), reason); if (reason != FlushReason.BUFFER_ROWS_REACH_LIMIT || activeChunk.numRows() >= properties.getMaxBufferRows()) { switchChunk(); @@ -233,6 +233,8 @@ public boolean flush(FlushReason reason) { } } if (!inactiveChunks.isEmpty()) { + LOG.info("Flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}", + database, table, label, cacheBytes.get(), cacheRows.get(), reason); streamLoad(0); return true; } else { @@ -251,6 +253,7 @@ public boolean flush(FlushReason reason) { // indicates the commit should not be triggered, such as it is FLUSHING, // or it's still doing commit asynchronously public boolean commit() { + LOG.debug("Try to commit, db: {}, table: {}, label: {}", database, table, label); boolean commitTriggered = false; if (!state.compareAndSet(State.ACTIVE, State.COMMITTING)) { if (state.get() != State.COMMITTING) { @@ -263,6 +266,7 @@ public boolean commit() { // label will be set to null after commit executes successfully if (label == null) { state.compareAndSet(State.COMMITTING, State.ACTIVE); + LOG.debug("Success to commit, db: {}, table: {}", database, table); return true; } else { // wait for the commit to finish @@ -271,10 +275,14 @@ public boolean commit() { } if (label == null) { - // if the data has never been flushed (label == null), the commit should fail so that StreamLoadManagerV2#init - // will schedule to flush the data first, and then trigger commit again + // if the data has never been flushed (label == null), the commit should fail + // so that StreamLoadManagerV2#init will schedule to flush the data first, and + // then trigger commit again boolean commitSuccess = cacheBytes.get() == 0; state.compareAndSet(State.COMMITTING, State.ACTIVE); + if (commitSuccess) { + LOG.debug("Success to commit, db: {}, table: {}", database, table); + } return commitSuccess; } @@ -345,14 +353,15 @@ public void complete(StreamLoadResponse response) { numRetries = 0; firstException = null; - LOG.info("Stream load flushed, db: {}, table: {}, label : {}", database, table, label); if (!inactiveChunks.isEmpty()) { - LOG.info("Stream load continue, db: {}, table: {}, label : {}", database, table, label); + LOG.info("Stream load continue, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}", + database, table, label, cacheBytes, cacheRows); streamLoad(0); return; } if (state.compareAndSet(State.FLUSHING, State.ACTIVE)) { - LOG.info("Stream load completed, db: {}, table: {}, label : {}", database, table, label); + LOG.info("Stream load completed, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}", + database, table, label, cacheBytes, cacheRows); } }