From 3ff3c3ac92f7d636e6783d2b1709839133e0c14f Mon Sep 17 00:00:00 2001 From: Zaorang Yang Date: Thu, 26 Oct 2023 17:06:39 +0800 Subject: [PATCH] [BugFix] Fix SDK write failure for non-default tables Signed-off-by: Zaorang Yang --- .../load/stream/DefaultStreamLoadManager.java | 2 +- .../data/load/stream/DefaultStreamLoader.java | 10 ++++------ .../properties/StreamLoadProperties.java | 11 +++++++++++ .../properties/StreamLoadTableProperties.java | 18 ++++++++++++++++++ .../load/stream/v2/StreamLoadManagerV2.java | 2 +- 5 files changed, 35 insertions(+), 8 deletions(-) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoadManager.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoadManager.java index 1eda052c..9228d4d8 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoadManager.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoadManager.java @@ -414,7 +414,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t synchronized (regions) { region = regions.get(uniqueKey); if (region == null) { - StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey); + StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey, database, table); LabelGenerator labelGenerator = labelGeneratorFactory.create(database, table); region = new BatchTableRegion(uniqueKey, database, table, this, tableProperties, streamLoader, labelGenerator); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java index f0d39671..e6898c8e 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java @@ -153,8 +153,7 @@ public Future send(TableRegion region) { log.warn("Stream load not start"); } if (begin(region)) { - StreamLoadTableProperties tableProperties = properties.getTableProperties(region.getUniqueKey()); - return executorService.submit(() -> send(tableProperties, region)); + return executorService.submit(() -> sendToSR(region)); } else { region.fail(new StreamLoadFailException("Transaction start failed, db : " + region.getDatabase())); } @@ -168,8 +167,7 @@ public Future send(TableRegion region, int delayMs) { log.warn("Stream load not start"); } if (begin(region)) { - StreamLoadTableProperties tableProperties = properties.getTableProperties(region.getUniqueKey()); - return executorService.schedule(() -> send(tableProperties, region), delayMs, TimeUnit.MILLISECONDS); + return executorService.schedule(() -> sendToSR(region), delayMs, TimeUnit.MILLISECONDS); } else { region.fail(new StreamLoadFailException("Transaction start failed, db : " + region.getDatabase())); } @@ -278,7 +276,7 @@ protected void initDefaultHeaders(StreamLoadProperties properties) { .toArray(Header[]::new); } - protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, TableRegion region) { + protected StreamLoadResponse sendToSR(TableRegion region) { try { String host = getAvailableHost(); String sendUrl = getSendUrl(host, region.getDatabase(), region.getTable()); @@ -289,7 +287,7 @@ protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, Tab httpPut.setEntity(region.getHttpEntity()); httpPut.setHeaders(defaultHeaders); - + StreamLoadTableProperties tableProperties = region.getProperties(); for (Map.Entry entry : tableProperties.getProperties().entrySet()) { httpPut.removeHeaders(entry.getKey()); httpPut.addHeader(entry.getKey(), entry.getValue()); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java index 51ea478d..ccf98ffe 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java @@ -174,6 +174,17 @@ public StreamLoadTableProperties getTableProperties(String uniqueKey) { return tablePropertiesMap.getOrDefault(uniqueKey, defaultTableProperties); } + public StreamLoadTableProperties getTableProperties(String uniqueKey, String database, String table) { + StreamLoadTableProperties tableProperties = getTableProperties(uniqueKey); + if (!tableProperties.getDatabase().equals(database) || !tableProperties.getDatabase().equals(table)) { + StreamLoadTableProperties.Builder tablePropertiesBuilder = StreamLoadTableProperties.builder(); + tablePropertiesBuilder = tablePropertiesBuilder.copyFrom(tableProperties).database(database).table(table); + return tablePropertiesBuilder.build(); + } else { + return tableProperties; + } + } + public Map getTablePropertiesMap() { return tablePropertiesMap; } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java index 4c48f8e8..7d9fa320 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java @@ -38,6 +38,7 @@ public class StreamLoadTableProperties implements Serializable { private final boolean enableUpsertDelete; private final long chunkLimit; private final int maxBufferRows; + private final String columns; private StreamLoadTableProperties(Builder builder) { this.database = builder.database; @@ -59,8 +60,11 @@ private StreamLoadTableProperties(Builder builder) { } this.maxBufferRows = builder.maxBufferRows; this.properties = builder.properties; + this.columns = builder.columns; } + public String getColumns() {return columns; } + public String getUniqueKey() { return uniqueKey; } @@ -114,6 +118,20 @@ private Builder() { } + // This function does not copy the uniqueKey and properties attributes because the uniqueKey + // is generated in the StreamLoadTableProperties constructor, and the properties are automatically + // populated during the build process. + public Builder copyFrom(StreamLoadTableProperties streamLoadTableProperties) { + database(streamLoadTableProperties.getDatabase()); + table(streamLoadTableProperties.getTable()); + columns(streamLoadTableProperties.getColumns()); + enableUpsertDelete(streamLoadTableProperties.isEnableUpsertDelete()); + streamLoadDataFormat(streamLoadTableProperties.getDataFormat()); + chunkLimit(streamLoadTableProperties.getChunkLimit()); + maxBufferRows(streamLoadTableProperties.getMaxBufferRows()); + return this; + } + public Builder uniqueKey(String uniqueKey) { this.uniqueKey = uniqueKey; return this; diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java index 39425bf6..2dd2f982 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java @@ -440,7 +440,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t synchronized (regions) { region = regions.get(uniqueKey); if (region == null) { - StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey); + StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey, database, table); LabelGenerator labelGenerator = labelGeneratorFactory.create(database, table); region = new TransactionTableRegion(uniqueKey, database, table, this, tableProperties, streamLoader, labelGenerator, maxRetries, retryIntervalInMs);