Skip to content

Commit

Permalink
[Fix][Doris] Fix catalog not closed (#8415)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Jan 2, 2025
1 parent 88c92ae commit 2d1db66
Showing 1 changed file with 32 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,39 +83,41 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
DorisSourceConfig dorisSourceConfig = DorisSourceConfig.of(context.getOptions());
List<DorisTableConfig> dorisTableConfigList = dorisSourceConfig.getTableConfigList();
Map<TablePath, DorisSourceTable> dorisSourceTables = new HashMap<>();
for (DorisTableConfig dorisTableConfig : dorisTableConfigList) {
CatalogTable table;
DorisCatalogFactory dorisCatalogFactory = new DorisCatalogFactory();
DorisCatalog catalog =
(DorisCatalog) dorisCatalogFactory.createCatalog("doris", context.getOptions());

DorisCatalogFactory dorisCatalogFactory = new DorisCatalogFactory();
try (DorisCatalog catalog =
(DorisCatalog) dorisCatalogFactory.createCatalog("doris", context.getOptions())) {
catalog.open();
TablePath tablePath = TablePath.of(dorisTableConfig.getTableIdentifier());
String readFields = dorisTableConfig.getReadField();
try {
List<String> readFiledList = null;
if (StringUtils.isNotBlank(readFields)) {
readFiledList =
Arrays.stream(readFields.split(","))
.map(String::trim)
.collect(Collectors.toList());
}
for (DorisTableConfig dorisTableConfig : dorisTableConfigList) {
CatalogTable table;
TablePath tablePath = TablePath.of(dorisTableConfig.getTableIdentifier());
String readFields = dorisTableConfig.getReadField();
try {
List<String> readFiledList = null;
if (StringUtils.isNotBlank(readFields)) {
readFiledList =
Arrays.stream(readFields.split(","))
.map(String::trim)
.collect(Collectors.toList());
}

table = catalog.getTable(tablePath, readFiledList);
} catch (Exception e) {
log.error("create source error");
throw e;
table = catalog.getTable(tablePath, readFiledList);
} catch (Exception e) {
log.error("create source error");
throw e;
}
dorisSourceTables.put(
tablePath,
DorisSourceTable.builder()
.catalogTable(table)
.tablePath(tablePath)
.readField(readFields)
.filterQuery(dorisTableConfig.getFilterQuery())
.batchSize(dorisTableConfig.getBatchSize())
.tabletSize(dorisTableConfig.getTabletSize())
.execMemLimit(dorisTableConfig.getExecMemLimit())
.build());
}
dorisSourceTables.put(
tablePath,
DorisSourceTable.builder()
.catalogTable(table)
.tablePath(tablePath)
.readField(readFields)
.filterQuery(dorisTableConfig.getFilterQuery())
.batchSize(dorisTableConfig.getBatchSize())
.tabletSize(dorisTableConfig.getTabletSize())
.execMemLimit(dorisTableConfig.getExecMemLimit())
.build());
}
return () ->
(SeaTunnelSource<T, SplitT, StateT>)
Expand Down

0 comments on commit 2d1db66

Please sign in to comment.