Skip to content

Commit

Permalink
[BugFix] Fix SDK write failure for non-default tables
Browse files Browse the repository at this point in the history
Signed-off-by: Zaorang Yang <[email protected]>
  • Loading branch information
zaorangyang committed Oct 26, 2023
1 parent c509c05 commit 3ff3c3a
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t
synchronized (regions) {
region = regions.get(uniqueKey);
if (region == null) {
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey);
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey, database, table);
LabelGenerator labelGenerator = labelGeneratorFactory.create(database, table);
region = new BatchTableRegion(uniqueKey, database, table, this, tableProperties,
streamLoader, labelGenerator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ public Future<StreamLoadResponse> send(TableRegion region) {
log.warn("Stream load not start");
}
if (begin(region)) {
StreamLoadTableProperties tableProperties = properties.getTableProperties(region.getUniqueKey());
return executorService.submit(() -> send(tableProperties, region));
return executorService.submit(() -> sendToSR(region));
} else {
region.fail(new StreamLoadFailException("Transaction start failed, db : " + region.getDatabase()));
}
Expand All @@ -168,8 +167,7 @@ public Future<StreamLoadResponse> send(TableRegion region, int delayMs) {
log.warn("Stream load not start");
}
if (begin(region)) {
StreamLoadTableProperties tableProperties = properties.getTableProperties(region.getUniqueKey());
return executorService.schedule(() -> send(tableProperties, region), delayMs, TimeUnit.MILLISECONDS);
return executorService.schedule(() -> sendToSR(region), delayMs, TimeUnit.MILLISECONDS);
} else {
region.fail(new StreamLoadFailException("Transaction start failed, db : " + region.getDatabase()));
}
Expand Down Expand Up @@ -278,7 +276,7 @@ protected void initDefaultHeaders(StreamLoadProperties properties) {
.toArray(Header[]::new);
}

protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, TableRegion region) {
protected StreamLoadResponse sendToSR(TableRegion region) {
try {
String host = getAvailableHost();
String sendUrl = getSendUrl(host, region.getDatabase(), region.getTable());
Expand All @@ -289,7 +287,7 @@ protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, Tab
httpPut.setEntity(region.getHttpEntity());

httpPut.setHeaders(defaultHeaders);

StreamLoadTableProperties tableProperties = region.getProperties();
for (Map.Entry<String, String> entry : tableProperties.getProperties().entrySet()) {
httpPut.removeHeaders(entry.getKey());
httpPut.addHeader(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ public StreamLoadTableProperties getTableProperties(String uniqueKey) {
return tablePropertiesMap.getOrDefault(uniqueKey, defaultTableProperties);
}

public StreamLoadTableProperties getTableProperties(String uniqueKey, String database, String table) {
StreamLoadTableProperties tableProperties = getTableProperties(uniqueKey);
if (!tableProperties.getDatabase().equals(database) || !tableProperties.getDatabase().equals(table)) {
StreamLoadTableProperties.Builder tablePropertiesBuilder = StreamLoadTableProperties.builder();
tablePropertiesBuilder = tablePropertiesBuilder.copyFrom(tableProperties).database(database).table(table);
return tablePropertiesBuilder.build();
} else {
return tableProperties;
}
}

public Map<String, StreamLoadTableProperties> getTablePropertiesMap() {
return tablePropertiesMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class StreamLoadTableProperties implements Serializable {
private final boolean enableUpsertDelete;
private final long chunkLimit;
private final int maxBufferRows;
private final String columns;

private StreamLoadTableProperties(Builder builder) {
this.database = builder.database;
Expand All @@ -59,8 +60,11 @@ private StreamLoadTableProperties(Builder builder) {
}
this.maxBufferRows = builder.maxBufferRows;
this.properties = builder.properties;
this.columns = builder.columns;
}

public String getColumns() {return columns; }

public String getUniqueKey() {
return uniqueKey;
}
Expand Down Expand Up @@ -114,6 +118,20 @@ private Builder() {

}

// This function does not copy the uniqueKey and properties attributes because the uniqueKey
// is generated in the StreamLoadTableProperties constructor, and the properties are automatically
// populated during the build process.
public Builder copyFrom(StreamLoadTableProperties streamLoadTableProperties) {
database(streamLoadTableProperties.getDatabase());
table(streamLoadTableProperties.getTable());
columns(streamLoadTableProperties.getColumns());
enableUpsertDelete(streamLoadTableProperties.isEnableUpsertDelete());
streamLoadDataFormat(streamLoadTableProperties.getDataFormat());
chunkLimit(streamLoadTableProperties.getChunkLimit());
maxBufferRows(streamLoadTableProperties.getMaxBufferRows());
return this;
}

public Builder uniqueKey(String uniqueKey) {
this.uniqueKey = uniqueKey;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t
synchronized (regions) {
region = regions.get(uniqueKey);
if (region == null) {
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey);
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey, database, table);
LabelGenerator labelGenerator = labelGeneratorFactory.create(database, table);
region = new TransactionTableRegion(uniqueKey, database, table, this,
tableProperties, streamLoader, labelGenerator, maxRetries, retryIntervalInMs);
Expand Down

0 comments on commit 3ff3c3a

Please sign in to comment.