Skip to content

Commit

Permalink
[hotfix-#1911][starrocks] fixed stream load failed do not throw excep…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
libailin authored and yanghuaiGit committed Aug 9, 2024
1 parent e24b617 commit 284c84f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
35 changes: 35 additions & 0 deletions chunjun-connectors/chunjun-connector-gbase8s/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>log4j:log4j</exclude>
<exclude>ch.qos.logback:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.bson</pattern>
<shadedPattern>com.dtstack.chunjun.connector.gbase8s.shaded.org.bson</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
Expand Down Expand Up @@ -199,6 +200,15 @@ public String handleErrMessage(StarRocksStreamLoadFailedException e) {
message, JSON.toJSONString(failedResponse));
}

@Override
public synchronized void close() throws IOException {
super.close();
// 解决当异步执行streamLoad时,flushException不为空,则认为整个任务应该抛出异常
if (streamLoadManager != null && streamLoadManager.getFlushException() != null) {
throw new RuntimeException(streamLoadManager.getFlushException());
}
}

@Override
protected void closeInternal() {
if (streamLoadManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,4 +332,8 @@ public boolean tableHasPartition() {
return starrocksQueryVisitor.hasPartitions(
starRocksConfig.getDatabase(), starRocksConfig.getTable());
}

public Throwable getFlushException() {
return flushException;
}
}

0 comments on commit 284c84f

Please sign in to comment.