diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java index 506a7c97dc8..05f3e408ed8 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java @@ -83,39 +83,41 @@ TableSource createSource(TableSourceFactoryContext context) { DorisSourceConfig dorisSourceConfig = DorisSourceConfig.of(context.getOptions()); List dorisTableConfigList = dorisSourceConfig.getTableConfigList(); Map dorisSourceTables = new HashMap<>(); - for (DorisTableConfig dorisTableConfig : dorisTableConfigList) { - CatalogTable table; - DorisCatalogFactory dorisCatalogFactory = new DorisCatalogFactory(); - DorisCatalog catalog = - (DorisCatalog) dorisCatalogFactory.createCatalog("doris", context.getOptions()); + + DorisCatalogFactory dorisCatalogFactory = new DorisCatalogFactory(); + try (DorisCatalog catalog = + (DorisCatalog) dorisCatalogFactory.createCatalog("doris", context.getOptions())) { catalog.open(); - TablePath tablePath = TablePath.of(dorisTableConfig.getTableIdentifier()); - String readFields = dorisTableConfig.getReadField(); - try { - List readFiledList = null; - if (StringUtils.isNotBlank(readFields)) { - readFiledList = - Arrays.stream(readFields.split(",")) - .map(String::trim) - .collect(Collectors.toList()); - } + for (DorisTableConfig dorisTableConfig : dorisTableConfigList) { + CatalogTable table; + TablePath tablePath = TablePath.of(dorisTableConfig.getTableIdentifier()); + String readFields = dorisTableConfig.getReadField(); + try { + List readFiledList = null; + if (StringUtils.isNotBlank(readFields)) { + readFiledList = + Arrays.stream(readFields.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + } - table = catalog.getTable(tablePath, readFiledList); - } catch (Exception e) { - log.error("create source error"); - throw e; + table = catalog.getTable(tablePath, readFiledList); + } catch (Exception e) { + log.error("create source error"); + throw e; + } + dorisSourceTables.put( + tablePath, + DorisSourceTable.builder() + .catalogTable(table) + .tablePath(tablePath) + .readField(readFields) + .filterQuery(dorisTableConfig.getFilterQuery()) + .batchSize(dorisTableConfig.getBatchSize()) + .tabletSize(dorisTableConfig.getTabletSize()) + .execMemLimit(dorisTableConfig.getExecMemLimit()) + .build()); } - dorisSourceTables.put( - tablePath, - DorisSourceTable.builder() - .catalogTable(table) - .tablePath(tablePath) - .readField(readFields) - .filterQuery(dorisTableConfig.getFilterQuery()) - .batchSize(dorisTableConfig.getBatchSize()) - .tabletSize(dorisTableConfig.getTabletSize()) - .execMemLimit(dorisTableConfig.getExecMemLimit()) - .build()); } return () -> (SeaTunnelSource)