Skip to content

Commit

Permalink
[Enhancement] Support to set warehouse when using transaction stream …
Browse files Browse the repository at this point in the history
…load (#361)

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Apr 29, 2024
1 parent 5c2a334 commit eb5f205
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public static void main(String[] args) throws Exception {
//
// 2. replace the connector options with your cluster configurations
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:11903");
String loadUrl = params.get("loadUrl", "127.0.0.1:11901");
String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:9030");
String loadUrl = params.get("loadUrl", "127.0.0.1:8030");
String userName = params.get("userName", "root");
String password = params.get("password", "");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ protected void initTxHeaders(StreamLoadProperties properties) {
} else {
beginHeaders.put("timeout", timeout);
}
String warehouse = properties.getHeaders().get("warehouse");
if (warehouse != null) {
beginHeaders.put("warehouse", warehouse);
}
this.beginTxnHeader = beginHeaders.entrySet().stream()
.map(entry -> new BasicHeader(entry.getKey(), entry.getValue()))
.toArray(Header[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,17 @@ public Builder expectDelayTime(long expectDelayTime) {
}

public Builder connectTimeout(int connectTimeout) {
if (connectTimeout < 100 || connectTimeout > 60000) {
throw new IllegalArgumentException("connectTimeout `" + connectTimeout + "ms` set failed, must range in [100, 60000]");
if (connectTimeout < 100) {
throw new IllegalArgumentException("connectTimeout `" + connectTimeout + "ms` set failed, must be larger than 100ms");
}
this.connectTimeout = connectTimeout;
return this;
}

public Builder waitForContinueTimeoutMs(int waitForContinueTimeoutMs) {
if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE || waitForContinueTimeoutMs > 60000) {
if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE) {
throw new IllegalArgumentException("waitForContinueTimeoutMs `" + waitForContinueTimeoutMs +
"ms` set failed, must be in range in [3000, 60000]");
"ms` set failed, must be be larger than 3000ms");
}
this.waitForContinueTimeoutMs = waitForContinueTimeoutMs;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
public enum FlushReason {
// No need to flush
NONE,
// Should commit the data
// Trigger the commit condition, such as flush interval,
// and should flush first
COMMIT,
// Cache is full, and need flush on or more tables
CACHE_FULL,
// The number of buffered rows reaches the limit
BUFFER_ROWS_REACH_LIMIT
BUFFER_ROWS_REACH_LIMIT,
// Force flush, such as StreamLoadManagerV2.flush
FORCE
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void init() {

if (savepoint) {
for (TransactionTableRegion region : flushQ) {
boolean flush = region.flush(FlushReason.COMMIT);
boolean flush = region.flush(FlushReason.FORCE);
LOG.debug("Trigger flush table region {} because of savepoint, region cache bytes: {}, flush: {}",
region.getUniqueKey(), region.getCacheBytes(), flush);
}
Expand All @@ -187,19 +187,20 @@ public void init() {
for (TransactionTableRegion region : flushQ) {
// savepoint makes sure no more data is written, so these conditions
// can guarantee commit after all data has been written to StarRocks
if (region.getCacheBytes() == 0 && !region.isFlushing()) {
boolean success = region.commit();
if (success) {
committedRegions += 1;
region.resetAge();
}
LOG.debug("Commit region {} for savepoint, success: {}", region.getUniqueKey(), success);
boolean success = region.commit();
if (success && region.getCacheBytes() == 0) {
committedRegions += 1;
region.resetAge();
}
LOG.debug("Commit region {} for savepoint, success: {}", region.getUniqueKey(), success);
}

if (committedRegions == flushQ.size()) {
allRegionsCommitted = true;
LOG.info("All regions committed for savepoint, number of regions: {}", committedRegions);
} else {
LOG.debug("Some regions not committed for savepoint, expected num: {}, actual num: {}",
flushQ.size(), committedRegions);
}
}
LockSupport.unpark(current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ public FlushReason shouldFlush() {
}

public boolean flush(FlushReason reason) {
LOG.debug("Try to flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}",
database, table, label, cacheBytes, cacheRows, reason);
if (state.compareAndSet(State.ACTIVE, State.FLUSHING)) {
for (;;) {
if (ctl.compareAndSet(false, true)) {
LOG.info("Flush uniqueKey : {}, label : {}, bytes : {}, rows: {}, reason: {}",
uniqueKey, label, cacheBytes.get(), cacheRows.get(), reason);
if (reason != FlushReason.BUFFER_ROWS_REACH_LIMIT ||
activeChunk.numRows() >= properties.getMaxBufferRows()) {
switchChunk();
Expand All @@ -233,6 +233,8 @@ public boolean flush(FlushReason reason) {
}
}
if (!inactiveChunks.isEmpty()) {
LOG.info("Flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}",
database, table, label, cacheBytes.get(), cacheRows.get(), reason);
streamLoad(0);
return true;
} else {
Expand All @@ -251,6 +253,7 @@ public boolean flush(FlushReason reason) {
// indicates the commit should not be triggered, such as it is FLUSHING,
// or it's still doing commit asynchronously
public boolean commit() {
LOG.debug("Try to commit, db: {}, table: {}, label: {}", database, table, label);
boolean commitTriggered = false;
if (!state.compareAndSet(State.ACTIVE, State.COMMITTING)) {
if (state.get() != State.COMMITTING) {
Expand All @@ -263,6 +266,7 @@ public boolean commit() {
// label will be set to null after commit executes successfully
if (label == null) {
state.compareAndSet(State.COMMITTING, State.ACTIVE);
LOG.debug("Success to commit, db: {}, table: {}", database, table);
return true;
} else {
// wait for the commit to finish
Expand All @@ -271,10 +275,14 @@ public boolean commit() {
}

if (label == null) {
// if the data has never been flushed (label == null), the commit should fail so that StreamLoadManagerV2#init
// will schedule to flush the data first, and then trigger commit again
// if the data has never been flushed (label == null), the commit should fail
// so that StreamLoadManagerV2#init will schedule to flush the data first, and
// then trigger commit again
boolean commitSuccess = cacheBytes.get() == 0;
state.compareAndSet(State.COMMITTING, State.ACTIVE);
if (commitSuccess) {
LOG.debug("Success to commit, db: {}, table: {}", database, table);
}
return commitSuccess;
}

Expand Down Expand Up @@ -345,14 +353,15 @@ public void complete(StreamLoadResponse response) {
numRetries = 0;
firstException = null;

LOG.info("Stream load flushed, db: {}, table: {}, label : {}", database, table, label);
if (!inactiveChunks.isEmpty()) {
LOG.info("Stream load continue, db: {}, table: {}, label : {}", database, table, label);
LOG.info("Stream load continue, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}",
database, table, label, cacheBytes, cacheRows);
streamLoad(0);
return;
}
if (state.compareAndSet(State.FLUSHING, State.ACTIVE)) {
LOG.info("Stream load completed, db: {}, table: {}, label : {}", database, table, label);
LOG.info("Stream load completed, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}",
database, table, label, cacheBytes, cacheRows);
}
}

Expand Down

0 comments on commit eb5f205

Please sign in to comment.