From 1eb5e3a444eb01c23ca4f9108d6d583fad9c8f7e Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Fri, 26 Apr 2024 13:58:19 +0800 Subject: [PATCH] [Enhancement] Transaction stream load supports to set warehouse header Signed-off-by: PengFei Li --- .../flink/examples/datastream/WriteMultipleTables.java | 4 ++-- .../data/load/stream/TransactionStreamLoader.java | 4 ++++ .../data/load/stream/properties/StreamLoadProperties.java | 8 ++++---- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java b/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java index d9322a56..0aa52844 100644 --- a/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java +++ b/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java @@ -71,8 +71,8 @@ public static void main(String[] args) throws Exception { // // 2. replace the connector options with your cluster configurations MultipleParameterTool params = MultipleParameterTool.fromArgs(args); - String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:11903"); - String loadUrl = params.get("loadUrl", "127.0.0.1:11901"); + String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:9030"); + String loadUrl = params.get("loadUrl", "127.0.0.1:8030"); String userName = params.get("userName", "root"); String password = params.get("password", ""); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java index 9731249f..c927f715 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java @@ -72,6 +72,10 @@ protected void initTxHeaders(StreamLoadProperties properties) { } else { beginHeaders.put("timeout", timeout); } + String warehouse = properties.getHeaders().get("warehouse"); + if (warehouse != null) { + beginHeaders.put("warehouse", warehouse); + } this.beginTxnHeader = beginHeaders.entrySet().stream() .map(entry -> new BasicHeader(entry.getKey(), entry.getValue())) .toArray(Header[]::new); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java index 118a168c..e09acd93 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java @@ -356,17 +356,17 @@ public Builder expectDelayTime(long expectDelayTime) { } public Builder connectTimeout(int connectTimeout) { - if (connectTimeout < 100 || connectTimeout > 60000) { - throw new IllegalArgumentException("connectTimeout `" + connectTimeout + "ms` set failed, must range in [100, 60000]"); + if (connectTimeout < 100) { + throw new IllegalArgumentException("connectTimeout `" + connectTimeout + "ms` set failed, must be larger than 100ms"); } this.connectTimeout = connectTimeout; return this; } public Builder waitForContinueTimeoutMs(int waitForContinueTimeoutMs) { - if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE || waitForContinueTimeoutMs > 60000) { + if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE) { throw new IllegalArgumentException("waitForContinueTimeoutMs `" + waitForContinueTimeoutMs + - "ms` set failed, must be in range in [3000, 60000]"); + "ms` set failed, must be be larger than 3000ms"); } this.waitForContinueTimeoutMs = waitForContinueTimeoutMs; return this;