diff --git a/pom.xml b/pom.xml index 86af84cce5..92b55b325e 100644 --- a/pom.xml +++ b/pom.xml @@ -113,13 +113,13 @@ 1.2.19 1.5.4 - 3.3.4 + 3.3.6 4.1.94.Final 1.2.1 2.1.6 - 1.1.6.bp4 + 1.2.0 2.11.0 diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ConnectType.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ConnectType.java index 5c336e4345..6d2572833b 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ConnectType.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ConnectType.java @@ -35,6 +35,10 @@ public enum ConnectType { // reserved for future version ODP_SHARDING_OB_ORACLE(DialectType.OB_ORACLE), ORACLE(DialectType.ORACLE), + OSS(DialectType.FILE_SYSTEM), + OBS(DialectType.FILE_SYSTEM), + COS(DialectType.FILE_SYSTEM), + S3A(DialectType.FILE_SYSTEM), UNKNOWN(DialectType.UNKNOWN), ; diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/DialectType.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/DialectType.java index 97b33431f8..b83c369e90 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/DialectType.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/DialectType.java @@ -29,6 +29,7 @@ public enum DialectType { ODP_SHARDING_OB_MYSQL, DORIS, POSTGRESQL, + FILE_SYSTEM, UNKNOWN, ; diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ErrorCodes.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ErrorCodes.java index ecd738b5b5..08940280af 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ErrorCodes.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ErrorCodes.java @@ -170,6 +170,7 @@ public enum ErrorCodes implements ErrorCode { // Schedule AlterScheduleExists, InvalidCronExpression, + ScheduleIntervalTooShort, UpdateNotAllowed, PauseNotAllowed, DeleteNotAllowed, @@ -315,7 +316,13 @@ public enum ErrorCodes implements ErrorCode { * workspace */ WorkspaceDatabaseUserTypeMustBeAdmin, - ; + /** + * oss + */ + BucketNotExist, + InvalidAccessKeyId, + SignatureDoesNotMatch, + UnsupportedSyncTableStructure; @Override public String code() { diff --git a/server/odc-core/src/main/resources/i18n/ErrorMessages.properties b/server/odc-core/src/main/resources/i18n/ErrorMessages.properties index b2479eebe8..3b4c0109b5 100644 --- a/server/odc-core/src/main/resources/i18n/ErrorMessages.properties +++ b/server/odc-core/src/main/resources/i18n/ErrorMessages.properties @@ -201,6 +201,11 @@ com.oceanbase.odc.ErrorCodes.DatabaseAccessDenied=Database access is denied beca com.oceanbase.odc.ErrorCodes.ObQueryProfileNotSupported=Query profile is only available for OceanBase Database with versions equal to or higher than {0}. com.oceanbase.odc.ErrorCodes.WorksheetEditVersionConflict=Someone has just modified this file. Please refresh the page to get the latest version and continue editing. com.oceanbase.odc.ErrorCodes.WorkspaceDatabaseUserTypeMustBeAdmin=The database user type used in the workspace must be a super account. +com.oceanbase.odc.ErrorCodes.BucketNotExist=Bucket does not exist +com.oceanbase.odc.ErrorCodes.InvalidAccessKeyId=Invalid access key id +com.oceanbase.odc.ErrorCodes.SignatureDoesNotMatch=Invalid access key secret +com.oceanbase.odc.ErrorCodes.UnsupportedSyncTableStructure=Sync table structure is not supported for {0} to {1} +com.oceanbase.odc.ErrorCodes.ScheduleIntervalTooShort=The execution interval is configured too short, please reconfigure. The minimum interval is: {0} seconds. com.oceanbase.odc.ErrorCodes.UpdateNotAllowed=Editing is not allowed in the current state. Please try again after disabling. com.oceanbase.odc.ErrorCodes.PauseNotAllowed=Disabling is not allowed in the current state, please check if there are any records in execution. com.oceanbase.odc.ErrorCodes.DeleteNotAllowed=Deletion is not allowed in the current state. diff --git a/server/odc-core/src/main/resources/i18n/ErrorMessages_zh_CN.properties b/server/odc-core/src/main/resources/i18n/ErrorMessages_zh_CN.properties index e910200748..ec15c6c22e 100644 --- a/server/odc-core/src/main/resources/i18n/ErrorMessages_zh_CN.properties +++ b/server/odc-core/src/main/resources/i18n/ErrorMessages_zh_CN.properties @@ -204,3 +204,9 @@ com.oceanbase.odc.ErrorCodes.UpdateNotAllowed=当前状态下不允许编辑, com.oceanbase.odc.ErrorCodes.PauseNotAllowed=当前状态下不允许禁用,请检查是否存在执行中的记录 com.oceanbase.odc.ErrorCodes.DeleteNotAllowed=当前状态下不允许删除 + +com.oceanbase.odc.ErrorCodes.BucketNotExist=桶不存在 +com.oceanbase.odc.ErrorCodes.InvalidAccessKeyId=无效的 AccessKeyId +com.oceanbase.odc.ErrorCodes.SignatureDoesNotMatch=无效的 AccessKeySecret +com.oceanbase.odc.ErrorCodes.UnsupportedSyncTableStructure=结构同步暂不支持 {0} 到 {1} +com.oceanbase.odc.ErrorCodes.ScheduleIntervalTooShort=执行间隔配置过短,请重新配置。最小间隔为:{0} 秒 \ No newline at end of file diff --git a/server/odc-core/src/main/resources/i18n/ErrorMessages_zh_TW.properties b/server/odc-core/src/main/resources/i18n/ErrorMessages_zh_TW.properties index 6b05e76c53..d587dfe771 100644 --- a/server/odc-core/src/main/resources/i18n/ErrorMessages_zh_TW.properties +++ b/server/odc-core/src/main/resources/i18n/ErrorMessages_zh_TW.properties @@ -205,3 +205,6 @@ com.oceanbase.odc.ErrorCodes.PauseNotAllowed=當前狀態下不允許禁用, com.oceanbase.odc.ErrorCodes.DeleteNotAllowed=在目前狀態下不允許删除 + +com.oceanbase.odc.ErrorCodes.UnsupportedSyncTableStructure=結構同步暫不支持 {0} 到 {1} +com.oceanbase.odc.ErrorCodes.ScheduleIntervalTooShort=執行間隔設定過短,請重新設定。最小間隔應為:{0} 秒 \ No newline at end of file diff --git a/server/odc-server/src/main/resources/data.sql b/server/odc-server/src/main/resources/data.sql index 3e9ce4e440..b64d204ef5 100644 --- a/server/odc-server/src/main/resources/data.sql +++ b/server/odc-server/src/main/resources/data.sql @@ -859,4 +859,10 @@ INSERT INTO config_system_configuration(`key`, `value`, `description`) VALUES('o INSERT INTO `config_system_configuration` (`key`, `value`, `application`, `profile`, `label`, `description`) VALUES ('odc.session.kill-query-or-session.max-supported-ob-version', '4.2.5', 'odc', 'default', 'master', 'Max OBVersion kill session or kill query supported, only take effect when value greater than 0') -ON DUPLICATE KEY UPDATE `id`=`id`; \ No newline at end of file +ON DUPLICATE KEY UPDATE `id`=`id`; +INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES('odc.task.dlm.session-limiting.enabled', 'true', +'Explosion-proof current limiting switch of mysql/oracle' ) +ON DUPLICATE KEY UPDATE `id` = `id`; +INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES('odc.task.dlm.session-limiting-ratio', '25', +'The ratio of oracle/mysql active sessions to the maximum number of connections allowed' ) +ON DUPLICATE KEY UPDATE `id` = `id`; \ No newline at end of file diff --git a/server/odc-server/src/main/resources/log4j2-task.xml b/server/odc-server/src/main/resources/log4j2-task.xml index d99b722e56..7c9e6965e1 100644 --- a/server/odc-server/src/main/resources/log4j2-task.xml +++ b/server/odc-server/src/main/resources/log4j2-task.xml @@ -64,6 +64,11 @@ + + + + + @@ -87,6 +92,11 @@ + + + + + @@ -117,6 +127,9 @@ + + + diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionEventPublisher.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionEventPublisher.java new file mode 100644 index 0000000000..4920c18554 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionEventPublisher.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.connection; + +import javax.annotation.PostConstruct; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.oceanbase.odc.common.event.AbstractEvent; +import com.oceanbase.odc.common.event.LocalEventPublisher; +import com.oceanbase.odc.service.connection.listener.UpdateDatasourceListener; + +import lombok.NonNull; + +/** + * @Author:tinker + * @Date: 2024/12/30 10:57 + * @Descripition: + */ + +@Component +public class ConnectionEventPublisher { + @Autowired + private LocalEventPublisher localEventPublisher; + + @Autowired + private UpdateDatasourceListener updateDatasourceListener; + + @PostConstruct + public void init() { + localEventPublisher.addEventListener(updateDatasourceListener); + } + + public void publishEvent(@NonNull T event) { + localEventPublisher.publishEvent(event); + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionService.java index 4900d717c5..6f3137542b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionService.java @@ -105,6 +105,7 @@ import com.oceanbase.odc.service.connection.ConnectionStatusManager.CheckState; import com.oceanbase.odc.service.connection.database.DatabaseService; import com.oceanbase.odc.service.connection.database.DatabaseSyncManager; +import com.oceanbase.odc.service.connection.event.UpsertDatasourceEvent; import com.oceanbase.odc.service.connection.model.ConnectProperties; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.connection.model.OBTenantEndpoint; @@ -221,6 +222,9 @@ public class ConnectionService { @Autowired private TransactionTemplate txTemplate; + @Autowired + private ConnectionEventPublisher connectionEventPublisher; + private final ConnectionMapper mapper = ConnectionMapper.INSTANCE; public static final String DEFAULT_MIN_PRIVILEGE = "read"; @@ -249,6 +253,7 @@ public ConnectionConfig create(@NotNull @Valid ConnectionConfig connection, @Not } }); databaseSyncManager.submitSyncDataSourceAndDBSchemaTask(saved); + connectionEventPublisher.publishEvent(new UpsertDatasourceEvent(saved)); return saved; } @@ -393,8 +398,11 @@ public ConnectionConfig getWithoutPermissionCheck(@NotNull Long id) { @SkipAuthorize("odc internal usage") public List listByOrganizationId(@NonNull Long organizationId) { - return entitiesToModels(repository.findByOrganizationIdOrderByNameAsc(organizationId), organizationId, true, + List connectionConfigs = entitiesToModels( + repository.findByOrganizationIdOrderByNameAsc(organizationId), organizationId, true, true); + fullFillAttributes(connectionConfigs); + return connectionConfigs; } @SkipAuthorize("odc internal usage") @@ -685,6 +693,7 @@ private ConnectionConfig updateConnectionConfig(Long id, ConnectionConfig connec } }); databaseSyncManager.submitSyncDataSourceAndDBSchemaTask(config); + connectionEventPublisher.publishEvent(new UpsertDatasourceEvent(config)); return config; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionTesting.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionTesting.java index 666772cf20..ea7a4b31d5 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionTesting.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionTesting.java @@ -74,6 +74,8 @@ public class ConnectionTesting { private ConnectionSSLAdaptor connectionSSLAdaptor; @Autowired private CloudMetadataClient cloudMetadataClient; + @Autowired + private FileSystemConnectionTester fileSystemConnectionTesting; @Value("${odc.sdk.test-connect.query-timeout-seconds:2}") private int queryTimeoutSeconds = 2; @@ -102,6 +104,9 @@ public ConnectionTestResult test(@NotNull @Valid TestConnectionReq req) { public ConnectionTestResult test(@NonNull ConnectionConfig config) { ConnectType type = config.getType(); + if (type.getDialectType() == DialectType.FILE_SYSTEM) { + return fileSystemConnectionTesting.test(config); + } try { /** * 进行连接测试时需要关注的值有一个 {@link ConnectType}, 容易产生问题信息主要是两个:{@code username}, {@code defaultSchema} 首先分析 @@ -232,6 +237,7 @@ private ConnectionConfig reqToConnectionConfig(TestConnectionReq req) { config.setServiceName(req.getServiceName()); config.setUserRole(req.getUserRole()); config.setCatalogName(req.getCatalogName()); + config.setRegion(req.getRegion()); OBTenantEndpoint endpoint = req.getEndpoint(); if (Objects.nonNull(endpoint) && OceanBaseAccessMode.IC_PROXY == endpoint.getAccessMode()) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionValidator.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionValidator.java index b2fd42c5a5..fcab8f4504 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionValidator.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionValidator.java @@ -21,6 +21,7 @@ import org.springframework.stereotype.Component; import com.oceanbase.odc.core.shared.PreConditions; +import com.oceanbase.odc.core.shared.constant.DialectType; import com.oceanbase.odc.core.shared.constant.ErrorCodes; import com.oceanbase.odc.core.shared.exception.AccessDeniedException; import com.oceanbase.odc.service.collaboration.environment.EnvironmentService; @@ -43,7 +44,9 @@ public class ConnectionValidator { void validateForUpsert(ConnectionConfig connection) { PreConditions.notNull(connection, "connection"); PreConditions.notBlank(connection.getHost(), "connection.host"); - PreConditions.notNull(connection.getPort(), "connection.port"); + if (connection.getDialectType() != DialectType.FILE_SYSTEM) { + PreConditions.notNull(connection.getPort(), "connection.port"); + } PreConditions.validNotSqlInjection(connection.getUsername(), "username"); PreConditions.validNotSqlInjection(connection.getClusterName(), "clusterName"); PreConditions.validNotSqlInjection(connection.getTenantName(), "tenantName"); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/FileSystemConnectionTester.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/FileSystemConnectionTester.java new file mode 100644 index 0000000000..43d5c37be3 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/FileSystemConnectionTester.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.connection; + +import java.io.ByteArrayInputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.text.MessageFormat; +import java.util.Collections; +import java.util.UUID; + +import org.springframework.stereotype.Component; + +import com.aliyun.oss.OSSErrorCode; +import com.aliyun.oss.OSSException; +import com.oceanbase.odc.core.shared.PreConditions; +import com.oceanbase.odc.core.shared.constant.ConnectType; +import com.oceanbase.odc.plugin.connect.api.TestResult; +import com.oceanbase.odc.service.cloud.model.CloudProvider; +import com.oceanbase.odc.service.connection.model.ConnectionConfig; +import com.oceanbase.odc.service.connection.model.ConnectionTestResult; +import com.oceanbase.odc.service.objectstorage.cloud.CloudResourceConfigurations; +import com.oceanbase.odc.service.objectstorage.cloud.client.CloudClient; +import com.oceanbase.odc.service.objectstorage.cloud.client.CloudException; +import com.oceanbase.odc.service.objectstorage.cloud.model.DeleteObjectsRequest; +import com.oceanbase.odc.service.objectstorage.cloud.model.ObjectMetadata; +import com.oceanbase.odc.service.objectstorage.cloud.model.ObjectStorageConfiguration; +import com.oceanbase.tools.migrator.common.exception.UnExpectedException; + +import lombok.NonNull; + +/** + * @Author:tinker + * @Date: 2024/11/19 11:24 + * @Descripition: + */ + +@Component +public class FileSystemConnectionTester { + + private static final String COS_ENDPOINT_PATTERN = "cos.{0}.myqcloud.com"; + private static final String OBS_ENDPOINT_PATTERN = "obs.{0}.myhuaweicloud.com"; + private static final String OSS_ENDPOINT_PATTERN = "oss-{0}.aliyuncs.com"; + private static final String S3_ENDPOINT_GLOBAL_PATTERN = "s3.{0}.amazonaws.com"; + private static final String S3_ENDPOINT_CN_PATTERN = "s3.{0}.amazonaws.com.cn"; + + private static final String TMP_FILE_NAME_PREFIX = "odc-test-object-"; + private static final String TMP_TEST_DATA = "This is a test object to check read and write permissions."; + + public ConnectionTestResult test(@NonNull ConnectionConfig config) { + PreConditions.notBlank(config.getPassword(), "AccessKeySecret"); + PreConditions.notBlank(config.getRegion(), "Region"); + URI uri = URI.create(config.getHost()); + ObjectStorageConfiguration storageConfig = new ObjectStorageConfiguration(); + storageConfig.setAccessKeyId(config.getUsername()); + storageConfig.setAccessKeySecret(config.getPassword()); + storageConfig.setBucketName(uri.getAuthority()); + storageConfig.setRegion(config.getRegion()); + storageConfig.setCloudProvider(getCloudProvider(config.getType())); + storageConfig.setPublicEndpoint(getEndPointByRegion(config.getType(), config.getRegion())); + try { + CloudClient cloudClient = + new CloudResourceConfigurations.CloudClientBuilder().generateCloudClient(storageConfig); + String objectKey = uri.getPath().endsWith("/") ? uri.getAuthority() + uri.getPath() + generateTempFileName() + : uri.getAuthority() + uri.getPath() + "/" + generateTempFileName(); + cloudClient.putObject(storageConfig.getBucketName(), objectKey, + new ByteArrayInputStream(TMP_TEST_DATA.getBytes(StandardCharsets.UTF_8)), new ObjectMetadata()); + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(); + deleteObjectsRequest.setBucketName(storageConfig.getBucketName()); + deleteObjectsRequest.setKeys(Collections.singletonList(objectKey)); + cloudClient.deleteObjects(deleteObjectsRequest); + return ConnectionTestResult.success(config.getType()); + } catch (CloudException e) { + if (e.getCause() != null && e.getCause() instanceof OSSException) { + OSSException cause = (OSSException) e.getCause(); + switch (cause.getErrorCode()) { + case OSSErrorCode.ACCESS_DENIED: + return new ConnectionTestResult(TestResult.akAccessDenied(storageConfig.getAccessKeyId()), + config.getType()); + case OSSErrorCode.INVALID_ACCESS_KEY_ID: + return new ConnectionTestResult(TestResult.invalidAccessKeyId(storageConfig.getAccessKeyId()), + config.getType()); + case OSSErrorCode.SIGNATURE_DOES_NOT_MATCH: + return new ConnectionTestResult( + TestResult.signatureDoesNotMatch(storageConfig.getAccessKeyId()), config.getType()); + case OSSErrorCode.NO_SUCH_BUCKET: + return new ConnectionTestResult(TestResult.bucketNotExist(storageConfig.getBucketName()), + config.getType()); + default: + return new ConnectionTestResult(TestResult.unknownError(e), config.getType()); + } + } + // TODO:process s3 error message + return new ConnectionTestResult(TestResult.unknownError(e), config.getType()); + } + } + + private CloudProvider getCloudProvider(ConnectType type) { + switch (type) { + case COS: + return CloudProvider.TENCENT_CLOUD; + case OBS: + return CloudProvider.HUAWEI_CLOUD; + case S3A: + return CloudProvider.AWS; + case OSS: + return CloudProvider.ALIBABA_CLOUD; + default: + throw new UnExpectedException(); + } + } + + private static String getEndPointByRegion(ConnectType type, String region) { + switch (type) { + case COS: + return MessageFormat.format(COS_ENDPOINT_PATTERN, region); + case OSS: + return MessageFormat.format(OSS_ENDPOINT_PATTERN, region); + case OBS: + return MessageFormat.format(OBS_ENDPOINT_PATTERN, region); + case S3A: + // Note there is a difference of Top-Level Domain between cn and global regions. + if (region.startsWith("cn-")) { + return MessageFormat.format(S3_ENDPOINT_CN_PATTERN, region); + } + return MessageFormat.format(S3_ENDPOINT_GLOBAL_PATTERN, region); + default: + throw new IllegalArgumentException("regionToEndpoint is not applicable for storageType " + type); + } + } + + private String generateTempFileName() { + return TMP_FILE_NAME_PREFIX + UUID.randomUUID(); + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java index bd23675cd4..9c27b53853 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java @@ -65,6 +65,7 @@ import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.core.session.ConnectionSession; import com.oceanbase.odc.core.shared.PreConditions; +import com.oceanbase.odc.core.shared.constant.DialectType; import com.oceanbase.odc.core.shared.constant.ErrorCodes; import com.oceanbase.odc.core.shared.constant.OrganizationType; import com.oceanbase.odc.core.shared.constant.ResourceRoleName; @@ -528,6 +529,9 @@ public Boolean internalSyncDataSourceSchemas(@NonNull Long dataSourceId) throws Optional organizationOpt = Optional.empty(); try { connection = connectionService.getForConnectionSkipPermissionCheck(dataSourceId); + if (connection.getDialectType() == DialectType.FILE_SYSTEM) { + return true; + } horizontalDataPermissionValidator.checkCurrentOrganization(connection); organizationOpt = organizationService.get(connection.getOrganizationId()); Organization organization = diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/event/UpsertDatasourceEvent.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/event/UpsertDatasourceEvent.java new file mode 100644 index 0000000000..cb4f9bcd28 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/event/UpsertDatasourceEvent.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.connection.event; + +import com.oceanbase.odc.common.event.AbstractEvent; +import com.oceanbase.odc.service.connection.model.ConnectionConfig; + +import lombok.Getter; + +/** + * @Author:tianke + * @Date: 2024/12/30 10:41 + * @Descripition: + */ +public class UpsertDatasourceEvent extends AbstractEvent { + + @Getter + private ConnectionConfig connectionConfig; + + public UpsertDatasourceEvent(ConnectionConfig connectionConfig) { + super(connectionConfig, "UpsertDatasourceEvent"); + this.connectionConfig = connectionConfig; + + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/listener/UpdateDatasourceListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/listener/UpdateDatasourceListener.java new file mode 100644 index 0000000000..f52b9832bb --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/listener/UpdateDatasourceListener.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.connection.listener; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import com.oceanbase.odc.common.event.AbstractEventListener; +import com.oceanbase.odc.core.shared.constant.DialectType; +import com.oceanbase.odc.metadb.connection.DatabaseEntity; +import com.oceanbase.odc.metadb.connection.DatabaseRepository; +import com.oceanbase.odc.service.connection.database.model.DatabaseSyncStatus; +import com.oceanbase.odc.service.connection.database.model.DatabaseType; +import com.oceanbase.odc.service.connection.event.UpsertDatasourceEvent; +import com.oceanbase.odc.service.connection.model.ConnectionConfig; +import com.oceanbase.odc.service.db.schema.model.DBObjectSyncStatus; + +import lombok.extern.slf4j.Slf4j; + +/** + * @Author:tinker + * @Date: 2024/12/30 10:47 + * @Descripition: + */ +@Slf4j +@Component +public class UpdateDatasourceListener extends AbstractEventListener { + + @Autowired + private DatabaseRepository databaseRepository; + + @Override + public void onEvent(UpsertDatasourceEvent event) { + + ConnectionConfig connectionConfig = event.getConnectionConfig(); + if (connectionConfig.getDialectType() != DialectType.FILE_SYSTEM) { + return; + } + List byConnectionId = databaseRepository.findByConnectionId(connectionConfig.getId()); + DatabaseEntity entity = null; + if (!CollectionUtils.isEmpty(byConnectionId)) { + List toBeDelete = byConnectionId.stream().filter( + o -> !connectionConfig.getHost().equals(o.getName())).map(DatabaseEntity::getId).collect( + Collectors.toList()); + if (!toBeDelete.isEmpty()) { + databaseRepository.deleteAllById(toBeDelete); + } + Optional existed = byConnectionId.stream().filter( + o -> connectionConfig.getHost().equals(o.getName())).findFirst(); + if (existed.isPresent()) { + entity = existed.get(); + } + } + // create or update + entity = entity == null ? new DatabaseEntity() : entity; + entity.setDatabaseId(com.oceanbase.odc.common.util.StringUtils.uuid()); + entity.setOrganizationId(connectionConfig.getOrganizationId()); + entity.setName(connectionConfig.getHost()); + entity.setProjectId(connectionConfig.getProjectId()); + entity.setConnectionId(connectionConfig.getId()); + entity.setEnvironmentId(connectionConfig.getEnvironmentId()); + entity.setSyncStatus(DatabaseSyncStatus.SUCCEEDED); + entity.setExisted(true); + entity.setObjectSyncStatus(DBObjectSyncStatus.SYNCED); + entity.setConnectType(connectionConfig.getType()); + entity.setType(DatabaseType.PHYSICAL); + databaseRepository.save(entity); + + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/model/ConnectionConfig.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/model/ConnectionConfig.java index b9bd8d1778..679ba0fdad 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/model/ConnectionConfig.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/model/ConnectionConfig.java @@ -546,6 +546,13 @@ public String getCloudProvider() { return o == null ? null : o.toString(); } + public void setRegion(@NotNull String region) { + if (this.attributes == null) { + attributes = new HashMap<>(); + } + attributes.put(REGION, region); + } + public String getRegion() { if (this.attributes == null) { return null; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/model/TestConnectionReq.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/model/TestConnectionReq.java index 8ca19b8fc9..1074d37549 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/model/TestConnectionReq.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/model/TestConnectionReq.java @@ -15,6 +15,7 @@ */ package com.oceanbase.odc.service.connection.model; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -46,6 +47,9 @@ @Data @ToString(exclude = {"password"}) public class TestConnectionReq implements CloudConnectionConfig, SSLConnectionConfig { + + private static final String REGION = "region"; + /** * Connection ID,用于编辑连接页面未传密码参数时从已保存的连接信息中获取对应密码字段 */ @@ -157,6 +161,21 @@ public DialectType getDialectType() { return this.dialectType; } + public void setRegion(String region) { + if (this.attributes == null) { + attributes = new HashMap<>(); + } + attributes.put(REGION, region); + } + + public String getRegion() { + if (this.attributes == null) { + return null; + } + Object o = attributes.get(REGION); + return o == null ? null : o.toString(); + } + public static TestConnectionReq fromConnection(ConnectionConfig connection, ConnectionAccountType accountType) { PreConditions.notNull(accountType, "AccountType"); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/db/schema/DBSchemaSyncTaskManager.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/db/schema/DBSchemaSyncTaskManager.java index c4d443a1d1..c79e860b05 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/db/schema/DBSchemaSyncTaskManager.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/db/schema/DBSchemaSyncTaskManager.java @@ -35,6 +35,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.oceanbase.odc.core.shared.constant.DialectType; import com.oceanbase.odc.core.shared.constant.ResourceType; import com.oceanbase.odc.core.shared.exception.ConflictException; import com.oceanbase.odc.core.shared.exception.NotFoundException; @@ -118,6 +119,9 @@ public void submitTaskByDatabases(@NonNull Collection databases) { } public void submitTaskByDataSource(@NonNull ConnectionConfig dataSource) { + if (dataSource.getDialectType() == DialectType.FILE_SYSTEM) { + return; + } List databases = databaseService.listExistDatabasesByConnectionId(dataSource.getId()); databases.removeIf(e -> (syncProperties.isBlockExclusionsWhenSyncDbSchemas() && syncProperties.getExcludeSchemas(dataSource.getDialectType()).contains(e.getName()) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMConfiguration.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMConfiguration.java index 475793c5e3..c79c3dd3fe 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMConfiguration.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMConfiguration.java @@ -16,11 +16,9 @@ package com.oceanbase.odc.service.dlm; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.oceanbase.tools.migrator.common.enums.ShardingStrategy; -import com.oceanbase.tools.migrator.core.IJobStore; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -36,9 +34,6 @@ @Configuration public class DLMConfiguration { - @Value("${odc.task.dlm.thread-pool-size:15}") - private int dlmThreadPoolSize; - @Value("${odc.task.dlm.single-task-read-write-ratio:0.5}") private double readWriteRatio; @@ -54,9 +49,10 @@ public class DLMConfiguration { @Value("${odc.task.dlm.default-scan-batch-size:10000}") private int defaultScanBatchSize; - @Bean - public DLMJobFactory dlmJobFactory(IJobStore jobStore) { - return new DLMJobFactory(jobStore); - } + @Value("${odc.task.dlm.session-limiting.enabled:true}") + private boolean sessionLimitingEnabled; + + @Value("${odc.task.dlm.session-limiting-ratio:25}") + private int sessionLimitingRatio; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobFactory.java index e11e83d500..2829ddb2ab 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobFactory.java @@ -18,9 +18,9 @@ import com.oceanbase.odc.service.dlm.model.DlmTableUnit; import com.oceanbase.odc.service.dlm.model.DlmTableUnitParameters; import com.oceanbase.tools.migrator.common.dto.HistoryJob; -import com.oceanbase.tools.migrator.core.IJobStore; import com.oceanbase.tools.migrator.core.JobFactory; import com.oceanbase.tools.migrator.core.JobReq; +import com.oceanbase.tools.migrator.core.store.IJobStore; import com.oceanbase.tools.migrator.job.Job; import lombok.extern.slf4j.Slf4j; @@ -41,7 +41,6 @@ public Job createJob(DlmTableUnit parameters) { HistoryJob historyJob = new HistoryJob(); historyJob.setId(parameters.getDlmTableUnitId()); historyJob.setJobType(parameters.getType()); - historyJob.setTableId(-1L); historyJob.setPrintSqlTrace(false); historyJob.setSourceTable(parameters.getTableName()); historyJob.setTargetTable(parameters.getTargetTableName()); @@ -56,6 +55,8 @@ public Job createJob(DlmTableUnit parameters) { req.setHistoryJob(historyJob); req.setSourceDs(parameters.getSourceDatasourceInfo()); req.setTargetDs(parameters.getTargetDatasourceInfo()); + req.setSourceLimitConfig(parameters.getSourceLimitConfig()); + req.setTargetLimitConfig(parameters.getTargetLimitConfig()); return super.createJob(req); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java index ba5d1a4be9..3d31cf1c77 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java @@ -16,37 +16,24 @@ package com.oceanbase.odc.service.dlm; import java.sql.Connection; -import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.LinkedList; import java.util.List; -import java.util.Map; import com.alibaba.druid.pool.DruidDataSource; -import com.oceanbase.odc.common.json.JsonUtils; -import com.oceanbase.odc.core.shared.constant.TaskStatus; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.dlm.model.DlmTableUnit; -import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration; -import com.oceanbase.odc.service.schedule.job.DLMJobReq; -import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants; +import com.oceanbase.odc.service.session.factory.DruidDataSourceFactory; import com.oceanbase.tools.migrator.common.dto.JobStatistic; -import com.oceanbase.tools.migrator.common.dto.TableSizeInfo; import com.oceanbase.tools.migrator.common.dto.TaskGenerator; import com.oceanbase.tools.migrator.common.element.PrimaryKey; -import com.oceanbase.tools.migrator.common.exception.JobException; -import com.oceanbase.tools.migrator.common.exception.JobSqlException; -import com.oceanbase.tools.migrator.common.meta.TableMeta; -import com.oceanbase.tools.migrator.core.IJobStore; import com.oceanbase.tools.migrator.core.handler.genarator.GeneratorStatus; -import com.oceanbase.tools.migrator.core.handler.genarator.GeneratorType; -import com.oceanbase.tools.migrator.core.meta.ClusterMeta; -import com.oceanbase.tools.migrator.core.meta.JobMeta; import com.oceanbase.tools.migrator.core.meta.TaskMeta; -import com.oceanbase.tools.migrator.core.meta.TenantMeta; +import com.oceanbase.tools.migrator.core.store.IJobStore; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** @@ -58,18 +45,21 @@ public class DLMJobStore implements IJobStore { private DruidDataSource dataSource; - private boolean enableBreakpointRecovery = false; - private Map dlmTableUnits; - private Map jobParameters; + private boolean enableBreakpointRecovery = true; + @Setter + private DlmTableUnit dlmTableUnit; public DLMJobStore(ConnectionConfig metaDBConfig) { + try { + DruidDataSourceFactory druidDataSourceFactory = new DruidDataSourceFactory(metaDBConfig); + dataSource = (DruidDataSource) druidDataSourceFactory.getDataSource(); + } catch (Exception e) { + log.warn("Failed to connect to the meta database; closing save point."); + enableBreakpointRecovery = false; + } } - public void setDlmTableUnits(Map dlmTableUnits) { - this.dlmTableUnits = dlmTableUnits; - } - public void destroy() { if (dataSource == null) { return; @@ -82,7 +72,7 @@ public void destroy() { } @Override - public TaskGenerator getTaskGenerator(String generatorId, String jobId) throws SQLException { + public TaskGenerator getTaskGenerator(String jobId) throws SQLException { if (enableBreakpointRecovery) { try (Connection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement( @@ -92,15 +82,14 @@ public TaskGenerator getTaskGenerator(String generatorId, String jobId) throws S if (resultSet.next()) { TaskGenerator taskGenerator = new TaskGenerator(); taskGenerator.setId(resultSet.getString("generator_id")); - taskGenerator.setGeneratorType(GeneratorType.valueOf(resultSet.getString("type"))); taskGenerator.setGeneratorStatus(GeneratorStatus.valueOf(resultSet.getString("status"))); taskGenerator.setJobId(jobId); taskGenerator.setTaskCount(resultSet.getInt("task_count")); taskGenerator - .setGeneratorSavePoint(PrimaryKey.valuesOf(resultSet.getString("primary_key_save_point"))); + .setPrimaryKeySavePoint(PrimaryKey.valuesOf(resultSet.getString("primary_key_save_point"))); taskGenerator.setProcessedDataSize(resultSet.getLong("processed_row_count")); taskGenerator.setProcessedDataSize(resultSet.getLong("processed_data_size")); - taskGenerator.setGeneratorPartitionSavepoint(resultSet.getString("partition_save_point")); + taskGenerator.setPartitionSavePoint(resultSet.getString("partition_save_point")); log.info("Load task generator success.jobId={}", jobId); return taskGenerator; } @@ -112,6 +101,16 @@ public TaskGenerator getTaskGenerator(String generatorId, String jobId) throws S @Override public void storeTaskGenerator(TaskGenerator taskGenerator) throws SQLException { + taskGenerator.getPartName2MaxKey() + .forEach((k, v) -> dlmTableUnit.getStatistic().getPartName2MaxKey().put(k, v.getSqlString())); + taskGenerator.getPartName2MinKey() + .forEach((k, v) -> dlmTableUnit.getStatistic().getPartName2MinKey().put(k, v.getSqlString())); + if (taskGenerator.getGlobalMaxKey() != null) { + dlmTableUnit.getStatistic().setGlobalMaxKey(taskGenerator.getGlobalMaxKey().getSqlString()); + } + if (taskGenerator.getGlobalMinKey() != null) { + dlmTableUnit.getStatistic().setGlobalMinKey(taskGenerator.getGlobalMinKey().getSqlString()); + } if (enableBreakpointRecovery) { StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO dlm_task_generator "); @@ -130,11 +129,11 @@ public void storeTaskGenerator(TaskGenerator taskGenerator) throws SQLException ps.setLong(3, taskGenerator.getProcessedDataSize()); ps.setLong(4, taskGenerator.getProcessedRowCount()); ps.setString(5, taskGenerator.getGeneratorStatus().name()); - ps.setString(6, GeneratorType.AUTO.name()); + ps.setString(6, ""); ps.setLong(7, taskGenerator.getTaskCount()); - ps.setString(8, taskGenerator.getGeneratorSavePoint() == null ? "" - : taskGenerator.getGeneratorSavePoint().toSqlString()); - ps.setString(9, taskGenerator.getGeneratorPartitionSavepoint()); + ps.setString(8, taskGenerator.getPrimaryKeySavePoint() == null ? "" + : taskGenerator.getPrimaryKeySavePoint().toSqlString()); + ps.setString(9, taskGenerator.getPartitionSavePoint()); if (ps.executeUpdate() == 1) { log.info("Update task generator success.jobId={}", taskGenerator.getJobId()); } else { @@ -145,39 +144,34 @@ public void storeTaskGenerator(TaskGenerator taskGenerator) throws SQLException } @Override - public void bindGeneratorToJob(String s, TaskGenerator taskGenerator) throws SQLException { - - } - - @Override - public JobStatistic getJobStatistic(String s) throws JobException { + public JobStatistic getJobStatistic(String s) throws SQLException { return new JobStatistic(); } @Override - public void storeJobStatistic(JobMeta jobMeta) throws JobSqlException { - dlmTableUnits.get(jobMeta.getJobId()).getStatistic().setProcessedRowCount(jobMeta.getJobStat().getRowCount()); - dlmTableUnits.get(jobMeta.getJobId()).getStatistic() - .setProcessedRowsPerSecond(jobMeta.getJobStat().getAvgRowCount()); + public void storeJobStatistic(JobStatistic jobStatistic) throws SQLException { + dlmTableUnit.getStatistic() + .setProcessedRowCount(jobStatistic.getRowCount().get()); + dlmTableUnit.getStatistic() + .setProcessedRowsPerSecond(jobStatistic.getRowCountPerSeconds()); - dlmTableUnits.get(jobMeta.getJobId()).getStatistic().setReadRowCount(jobMeta.getJobStat().getReadRowCount()); - dlmTableUnits.get(jobMeta.getJobId()).getStatistic() - .setReadRowsPerSecond(jobMeta.getJobStat().getAvgReadRowCount()); + dlmTableUnit.getStatistic().setReadRowCount(jobStatistic.getReadRowCount().get()); + dlmTableUnit.getStatistic() + .setReadRowsPerSecond(jobStatistic.getReadRowCountPerSeconds()); } @Override - public List getTaskMeta(JobMeta jobMeta) throws SQLException { + public List loadUnfinishedTask(String generatorId) throws SQLException { if (enableBreakpointRecovery) { try (Connection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement( "select * from dlm_task_unit where generator_id = ? AND status !='SUCCESS'")) { - ps.setString(1, jobMeta.getGenerator().getId()); + ps.setString(1, generatorId); ResultSet resultSet = ps.executeQuery(); List taskMetas = new LinkedList<>(); while (resultSet.next()) { TaskMeta taskMeta = new TaskMeta(); taskMeta.setTaskIndex(resultSet.getLong("task_index")); - taskMeta.setJobMeta(jobMeta); taskMeta.setGeneratorId(resultSet.getString("generator_id")); taskMeta.setTaskStatus(com.oceanbase.tools.migrator.common.enums.TaskStatus .valueOf(resultSet.getString("status"))); @@ -197,7 +191,6 @@ public List getTaskMeta(JobMeta jobMeta) throws SQLException { @Override public void storeTaskMeta(TaskMeta taskMeta) throws SQLException { if (enableBreakpointRecovery) { - log.info("start to store taskMeta:{}", taskMeta); StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO dlm_task_unit "); sb.append( @@ -228,7 +221,8 @@ public void storeTaskMeta(TaskMeta taskMeta) throws SQLException { } @Override - public Long getAbnormalTaskIndex(String jobId) { + public long getAbnormalTaskCount(String jobId) { + long count = 0; if (enableBreakpointRecovery) { try (Connection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement( @@ -236,78 +230,14 @@ public Long getAbnormalTaskIndex(String jobId) { ps.setString(1, jobId); ResultSet resultSet = ps.executeQuery(); if (resultSet.next()) { - long count = resultSet.getLong(1); - return count > 0 ? count : null; + count = resultSet.getLong(1); } } catch (Exception ignored) { log.warn("Get abnormal task failed.jobId={}", jobId); } } - return null; + return count; } - @Override - public void updateTableSizeInfo(TableSizeInfo tableSizeInfo, long l) { - - } - @Override - public void updateLimiter(JobMeta jobMeta) { - try { - RateLimitConfiguration params; - if (jobParameters.containsKey(JobParametersKeyConstants.DLM_RATE_LIMIT_CONFIG)) { - params = JsonUtils.fromJson( - jobParameters.get(JobParametersKeyConstants.DLM_RATE_LIMIT_CONFIG), - RateLimitConfiguration.class); - } else { - DLMJobReq dlmJobReq = JsonUtils.fromJson( - jobParameters.get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON), - DLMJobReq.class); - params = dlmJobReq.getRateLimit(); - } - if (params.getDataSizeLimit() != null) { - setClusterLimitConfig(jobMeta.getSourceCluster(), params.getDataSizeLimit()); - setClusterLimitConfig(jobMeta.getTargetCluster(), params.getDataSizeLimit()); - setTenantLimitConfig(jobMeta.getSourceTenant(), params.getDataSizeLimit()); - setTenantLimitConfig(jobMeta.getTargetTenant(), params.getDataSizeLimit()); - log.info("Update rate limit success,dataSizeLimit={}", params.getDataSizeLimit()); - } - if (params.getRowLimit() != null) { - setTableLimitConfig(jobMeta.getTargetTableMeta(), params.getRowLimit()); - setTableLimitConfig(jobMeta.getSourceTableMeta(), params.getRowLimit()); - log.info("Update rate limit success,rowLimit={}", params.getRowLimit()); - } - } catch (Exception e) { - log.warn("Update rate limit failed,errorMsg={}", e.getMessage()); - setClusterLimitConfig(jobMeta.getSourceCluster(), 1024); - setClusterLimitConfig(jobMeta.getTargetCluster(), 1024); - setTenantLimitConfig(jobMeta.getSourceTenant(), 1024); - setTenantLimitConfig(jobMeta.getTargetTenant(), 1024); - setTableLimitConfig(jobMeta.getTargetTableMeta(), 1000); - setTableLimitConfig(jobMeta.getSourceTableMeta(), 1000); - } - } - - public void setJobParameters(Map jobParameters) { - this.jobParameters = jobParameters; - } - - private void setClusterLimitConfig(ClusterMeta clusterMeta, long dataSizeLimit) { - clusterMeta.setReadSizeLimit(dataSizeLimit); - clusterMeta.setWriteSizeLimit(dataSizeLimit); - clusterMeta.setWriteUsedQuota(1); - clusterMeta.setReadUsedQuota(1); - } - - private void setTenantLimitConfig(TenantMeta tenantMeta, long dataSizeLimit) { - tenantMeta.setReadSizeLimit(dataSizeLimit); - tenantMeta.setWriteSizeLimit(dataSizeLimit); - tenantMeta.setWriteUsedQuota(1); - tenantMeta.setReadUsedQuota(1); - } - - private void setTableLimitConfig(TableMeta tableMeta, int rowLimit) { - tableMeta.setReadRowCountLimit(rowLimit); - tableMeta.setWriteRowCountLimit(rowLimit); - } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java index fe02913217..9862737d42 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java @@ -97,6 +97,9 @@ public void createOrUpdateDlmTableUnits(List dlmTableUnits) { DlmTableUnitEntity entity; if (entityOptional.isPresent()) { entity = entityOptional.get(); + if (entity.getStatus() == TaskStatus.DONE) { + return; + } entity.setStatistic(JsonUtils.toJson(o.getStatistic())); entity.setStatus(o.getStatus()); entity.setStartTime(o.getStartTime()); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMTableStructureSynchronizer.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMTableStructureSynchronizer.java index eee02f6a3d..c772f46ac9 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMTableStructureSynchronizer.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMTableStructureSynchronizer.java @@ -94,11 +94,12 @@ public static void sync(ConnectionConfig srcConfig, ConnectionConfig tgtConfig, DBTableStructureComparator comparator = new DBTableStructureComparator(tgtTableEditor, tgtConfig.getType().getDialectType(), srcConfig.getDefaultSchema(), tgtConfig.getDefaultSchema()); List changeSqlScript = new LinkedList<>(); + targetType.remove(DBObjectType.TABLE); if (tgtTable == null) { srcTable.setSchemaName(tgtConfig.getDefaultSchema()); srcTable.setName(tgtTableName); changeSqlScript.add(tgtTableEditor.generateCreateObjectDDL(srcTable)); - } else { + } else if (!targetType.isEmpty()) { DBObjectComparisonResult result = comparator.compare(srcTable, tgtTable); if (result.getComparisonResult() == ComparisonResult.INCONSISTENT) { changeSqlScript = result.getSubDBObjectComparisonResult().stream() @@ -128,11 +129,10 @@ public static void sync(ConnectionConfig srcConfig, ConnectionConfig tgtConfig, public static boolean isSupportedSyncTableStructure(DialectType srcType, String srcVersion, DialectType tgtType, String tgtVersion) { - // only supports MySQL or OBMySQL - if (!srcType.isMysql() || !tgtType.isMysql()) { + if (srcType != tgtType) { return false; } - if (srcType != tgtType) { + if (!srcType.isOceanbase() && !srcType.isMysql()) { return false; } // unsupported MySQL versions below 5.7.0 diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DataArchiveJobStore.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DataArchiveJobStore.java deleted file mode 100644 index f373059cb0..0000000000 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DataArchiveJobStore.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (c) 2023 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.oceanbase.odc.service.dlm; - -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -import com.oceanbase.odc.common.json.JsonUtils; -import com.oceanbase.odc.metadb.dlm.DlmTableUnitRepository; -import com.oceanbase.odc.metadb.dlm.TaskGeneratorEntity; -import com.oceanbase.odc.metadb.dlm.TaskGeneratorRepository; -import com.oceanbase.odc.metadb.dlm.TaskUnitEntity; -import com.oceanbase.odc.metadb.dlm.TaskUnitRepository; -import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration; -import com.oceanbase.odc.service.dlm.utils.DlmJobIdUtil; -import com.oceanbase.odc.service.dlm.utils.TaskGeneratorMapper; -import com.oceanbase.odc.service.dlm.utils.TaskUnitMapper; -import com.oceanbase.odc.service.schedule.model.DlmTableUnitStatistic; -import com.oceanbase.tools.migrator.common.dto.JobStatistic; -import com.oceanbase.tools.migrator.common.dto.TableSizeInfo; -import com.oceanbase.tools.migrator.common.dto.TaskGenerator; -import com.oceanbase.tools.migrator.common.meta.TableMeta; -import com.oceanbase.tools.migrator.core.IJobStore; -import com.oceanbase.tools.migrator.core.meta.ClusterMeta; -import com.oceanbase.tools.migrator.core.meta.JobMeta; -import com.oceanbase.tools.migrator.core.meta.TaskMeta; -import com.oceanbase.tools.migrator.core.meta.TenantMeta; - -import lombok.extern.slf4j.Slf4j; - -/** - * @Author:tinker - * @Date: 2023/5/8 19:27 - * @Descripition: TODO Store runtime data and use it to resume execution from a breakpoint. - */ -@Component -@Slf4j -public class DataArchiveJobStore implements IJobStore { - - @Value("${odc.task.dlm.support-breakpoint-recovery:false}") - private boolean supportBreakpointRecovery; - @Autowired - private DlmLimiterService limiterService; - @Autowired - private TaskGeneratorRepository taskGeneratorRepository; - @Autowired - private TaskUnitRepository taskUnitRepository; - @Autowired - private DlmTableUnitRepository dlmTableUnitRepository; - - private final TaskGeneratorMapper taskGeneratorMapper = TaskGeneratorMapper.INSTANCE; - private final TaskUnitMapper taskUnitMapper = TaskUnitMapper.INSTANCE; - - @Override - public TaskGenerator getTaskGenerator(String generatorId, String jobId) { - if (supportBreakpointRecovery) { - return taskGeneratorRepository.findByJobId(jobId).map(taskGeneratorMapper::entityToModel) - .orElse(null); - } - return null; - } - - @Override - public void storeTaskGenerator(TaskGenerator taskGenerator) { - if (supportBreakpointRecovery) { - Optional optional = taskGeneratorRepository.findByGeneratorId(taskGenerator.getId()); - TaskGeneratorEntity entity; - if (optional.isPresent()) { - entity = optional.get(); - entity.setStatus(taskGenerator.getGeneratorStatus().name()); - entity.setTaskCount(taskGenerator.getTaskCount()); - entity.setPartitionSavePoint(taskGenerator.getGeneratorPartitionSavepoint()); - entity.setProcessedRowCount(taskGenerator.getProcessedRowCount()); - entity.setProcessedDataSize(taskGenerator.getProcessedDataSize()); - if (taskGenerator.getGeneratorSavePoint() != null) { - entity.setPrimaryKeySavePoint(taskGenerator.getGeneratorSavePoint().toSqlString()); - } - } else { - entity = taskGeneratorMapper.modelToEntity(taskGenerator); - } - taskGeneratorRepository.save(entity); - } - } - - @Override - public void bindGeneratorToJob(String jobId, TaskGenerator taskGenerator) {} - - @Override - public JobStatistic getJobStatistic(String s) { - return new JobStatistic(); - } - - @Override - public void storeJobStatistic(JobMeta jobMeta) { - DlmTableUnitStatistic dlmExecutionDetail = new DlmTableUnitStatistic(); - dlmExecutionDetail.setProcessedRowCount(jobMeta.getJobStat().getRowCount()); - dlmExecutionDetail.setProcessedRowsPerSecond(jobMeta.getJobStat().getAvgRowCount()); - dlmExecutionDetail.setReadRowCount(jobMeta.getJobStat().getReadRowCount()); - dlmExecutionDetail.setReadRowsPerSecond(jobMeta.getJobStat().getAvgReadRowCount()); - dlmTableUnitRepository.updateStatisticByDlmTableUnitId(jobMeta.getJobId(), - JsonUtils.toJson(dlmExecutionDetail)); - } - - @Override - public List getTaskMeta(JobMeta jobMeta) { - if (supportBreakpointRecovery) { - List tasks = taskUnitRepository.findByGeneratorId(jobMeta.getGenerator().getId()).stream().map( - taskUnitMapper::entityToModel).collect( - Collectors.toList()); - tasks.forEach(o -> o.setJobMeta(jobMeta)); - return tasks; - } - return null; - } - - @Override - public void storeTaskMeta(TaskMeta taskMeta) { - if (supportBreakpointRecovery) { - Optional optional = taskUnitRepository.findByJobIdAndGeneratorIdAndTaskIndex( - taskMeta.getJobMeta().getJobId(), taskMeta.getGeneratorId(), taskMeta.getTaskIndex()); - TaskUnitEntity entity; - if (optional.isPresent()) { - entity = optional.get(); - entity.setStatus(taskMeta.getTaskStatus().name()); - entity.setPartitionName(taskMeta.getPartitionName()); - if (taskMeta.getMinPrimaryKey() != null) { - entity.setLowerBoundPrimaryKey(taskMeta.getMinPrimaryKey().toSqlString()); - } - if (taskMeta.getMaxPrimaryKey() != null) { - entity.setUpperBoundPrimaryKey(taskMeta.getMaxPrimaryKey().toSqlString()); - } - if (taskMeta.getCursorPrimaryKey() != null) { - entity.setPrimaryKeyCursor(taskMeta.getCursorPrimaryKey().toSqlString()); - } - } else { - entity = taskUnitMapper.modelToEntity(taskMeta); - } - taskUnitRepository.save(entity); - } - } - - @Override - public Long getAbnormalTaskIndex(String jobId) { - if (supportBreakpointRecovery) { - Long abnormalTaskCount = taskUnitRepository.findAbnormalTaskByJobId(jobId); - if (abnormalTaskCount != 0) { - return abnormalTaskCount; - } - } - return null; - } - - @Override - public void updateTableSizeInfo(TableSizeInfo tableSizeInfo, long l) { - - } - - @Override - public void updateLimiter(JobMeta jobMeta) { - RateLimitConfiguration rateLimit; - try { - rateLimit = limiterService - .getByOrderIdOrElseDefaultConfig(Long.parseLong(DlmJobIdUtil.getJobName(jobMeta.getJobId()))); - } catch (Exception e) { - log.warn("Update limiter failed,jobId={},error={}", - jobMeta.getJobId(), e); - return; - } - setClusterLimitConfig(jobMeta.getSourceCluster(), rateLimit.getDataSizeLimit()); - setClusterLimitConfig(jobMeta.getTargetCluster(), rateLimit.getDataSizeLimit()); - setTenantLimitConfig(jobMeta.getSourceTenant(), rateLimit.getDataSizeLimit()); - setTenantLimitConfig(jobMeta.getTargetTenant(), rateLimit.getDataSizeLimit()); - setTableLimitConfig(jobMeta.getSourceTableMeta(), rateLimit.getRowLimit()); - setTableLimitConfig(jobMeta.getTargetTableMeta(), rateLimit.getRowLimit()); - } - - private void setClusterLimitConfig(ClusterMeta clusterMeta, long dataSizeLimit) { - clusterMeta.setReadSizeLimit(dataSizeLimit); - clusterMeta.setWriteSizeLimit(dataSizeLimit); - clusterMeta.setWriteUsedQuota(1); - clusterMeta.setReadUsedQuota(1); - } - - private void setTenantLimitConfig(TenantMeta tenantMeta, long dataSizeLimit) { - tenantMeta.setReadSizeLimit(dataSizeLimit); - tenantMeta.setWriteSizeLimit(dataSizeLimit); - tenantMeta.setWriteUsedQuota(1); - tenantMeta.setReadUsedQuota(1); - } - - private void setTableLimitConfig(TableMeta tableMeta, int rowLimit) { - tableMeta.setReadRowCountLimit(rowLimit); - tableMeta.setWriteRowCountLimit(rowLimit); - } - -} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DataSourceInfoMapper.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DataSourceInfoMapper.java index 56e8a383be..e33817c7f0 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DataSourceInfoMapper.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DataSourceInfoMapper.java @@ -27,7 +27,8 @@ import com.oceanbase.odc.service.plugin.ConnectionPluginUtil; import com.oceanbase.odc.service.session.factory.OBConsoleDataSourceFactory; import com.oceanbase.tools.migrator.common.configure.DataSourceInfo; -import com.oceanbase.tools.migrator.common.enums.DataBaseType; +import com.oceanbase.tools.migrator.common.enums.DatasourceType; +import com.oceanbase.tools.migrator.datasource.fs.FileFormat; import lombok.extern.slf4j.Slf4j; @@ -43,29 +44,14 @@ public static ConnectionConfig toConnectionConfig(DataSourceInfo dataSourceInfo) ConnectionConfig connectionConfig = new ConnectionConfig(); connectionConfig.setDefaultSchema(dataSourceInfo.getDatabaseName()); connectionConfig.setPassword(dataSourceInfo.getPassword()); - connectionConfig.setHost(dataSourceInfo.getIp()); + connectionConfig.setHost(dataSourceInfo.getHost()); connectionConfig.setPort(dataSourceInfo.getPort()); - connectionConfig.setUsername(dataSourceInfo.getFullUserName()); - connectionConfig.setType(ConnectType.valueOf(dataSourceInfo.getDatabaseType().name())); - // convert full username to native user name - if (dataSourceInfo.getDatabaseType() == DataBaseType.OB_ORACLE) { - String userName = connectionConfig.getUsername(); - if (userName.contains("#")) { - userName = userName.split("#")[0]; - } - if (userName.contains("@")) { - userName = userName.split("@")[0]; - } - if (userName.contains("\"")) { - userName = userName.replace("\"", ""); - } - connectionConfig.setUsername(userName); - connectionConfig.setTenantName(dataSourceInfo.getTenantName()); - connectionConfig.setClusterName(dataSourceInfo.getClusterName()); - } + connectionConfig.setUsername(dataSourceInfo.getUsername()); + connectionConfig.setType(ConnectType.valueOf(dataSourceInfo.getType().name())); return connectionConfig; } + public static DataSourceInfo toDataSourceInfo(ConnectionConfig connectionConfig, String schemaName) { DataSourceInfo dataSourceInfo = new DataSourceInfo(); dataSourceInfo.setDatabaseName(connectionConfig.getDefaultSchema()); @@ -73,40 +59,45 @@ public static DataSourceInfo toDataSourceInfo(ConnectionConfig connectionConfig, if (StringUtils.isNotEmpty(connectionConfig.getPassword())) { dataSourceInfo.setPassword(connectionConfig.getPassword()); } - dataSourceInfo.setIp(connectionConfig.getHost()); + dataSourceInfo.setHost(connectionConfig.getHost()); dataSourceInfo.setPort(connectionConfig.getPort()); switch (connectionConfig.getDialectType()) { case DORIS: case MYSQL: { - dataSourceInfo.setFullUserName(connectionConfig.getUsername()); - dataSourceInfo.setDatabaseType(DataBaseType.MYSQL); + dataSourceInfo.setUsername(connectionConfig.getUsername()); + dataSourceInfo.setType(DatasourceType.MYSQL); break; } case OB_MYSQL: { dataSourceInfo - .setFullUserName(OBConsoleDataSourceFactory.getUsername(connectionConfig)); - dataSourceInfo.setDatabaseType(DataBaseType.OB_MYSQL); - dataSourceInfo.setClusterName(connectionConfig.getClusterName()); - dataSourceInfo.setSysDatabaseName("oceanbase"); + .setUsername(OBConsoleDataSourceFactory.getUsername(connectionConfig)); + dataSourceInfo.setType(DatasourceType.OB_MYSQL); break; } case OB_ORACLE: - dataSourceInfo.setFullUserName(OBConsoleDataSourceFactory.getUsername(connectionConfig)); - dataSourceInfo.setClusterName(connectionConfig.getClusterName()); - dataSourceInfo.setTenantName(connectionConfig.getTenantName()); - dataSourceInfo.setDatabaseType(DataBaseType.OB_ORACLE); + dataSourceInfo.setUsername(OBConsoleDataSourceFactory.getUsername(connectionConfig)); + dataSourceInfo.setType(DatasourceType.OB_ORACLE); break; case POSTGRESQL: - dataSourceInfo.setFullUserName(connectionConfig.getUsername()); + dataSourceInfo.setUsername(connectionConfig.getUsername()); connectionConfig.setDefaultSchema(schemaName); String jdbcUrl = getJdbcUrl(connectionConfig) + "&stringtype=unspecified"; - dataSourceInfo.setJdbcUrl(jdbcUrl); - dataSourceInfo.setDatabaseType(DataBaseType.POSTGRESQL); + dataSourceInfo.setUrl(jdbcUrl); + dataSourceInfo.setType(DatasourceType.POSTGRESQL); break; case ORACLE: - dataSourceInfo.setJdbcUrl(getJdbcUrl(connectionConfig)); - dataSourceInfo.setDatabaseType(DataBaseType.ORACLE); - dataSourceInfo.setFullUserName(getOracleUsername(connectionConfig)); + dataSourceInfo.setUrl(getJdbcUrl(connectionConfig)); + dataSourceInfo.setType(DatasourceType.ORACLE); + dataSourceInfo.setUsername(getOracleUsername(connectionConfig)); + break; + case FILE_SYSTEM: + dataSourceInfo.setHost(connectionConfig.getHost()); + dataSourceInfo.setType(DatasourceType.valueOf(connectionConfig.getType().name())); + dataSourceInfo.setUsername(connectionConfig.getUsername()); + dataSourceInfo.setPassword(connectionConfig.getPassword()); + dataSourceInfo.setFileFormat(FileFormat.CSV); + dataSourceInfo.setRegion(connectionConfig.getRegion()); + dataSourceInfo.setDefaultCharset("UTF-8"); break; default: log.warn(String.format("Unsupported datasource type:%s", connectionConfig.getDialectType())); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DataArchiveTableConfig.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DataArchiveTableConfig.java index b852f42cdb..06103cadc9 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DataArchiveTableConfig.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DataArchiveTableConfig.java @@ -15,8 +15,10 @@ */ package com.oceanbase.odc.service.dlm.model; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import com.oceanbase.odc.common.util.StringUtils; @@ -40,6 +42,14 @@ public class DataArchiveTableConfig { // the sql condition such as "gmt_create < '2023-01-01'" private String conditionExpression; + private String minKey; + + private String maxKey; + + private Map partName2MinKey = new HashMap<>(); + + private Map partName2MaxKey = new HashMap<>(); + public String getTargetTableName() { return StringUtils.isEmpty(targetTableName) ? tableName : targetTableName; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DlmTableUnit.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DlmTableUnit.java index 2370ed086d..03a0b03183 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DlmTableUnit.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DlmTableUnit.java @@ -16,11 +16,14 @@ package com.oceanbase.odc.service.dlm.model; import java.util.Date; +import java.util.Set; import com.oceanbase.odc.core.shared.constant.TaskStatus; import com.oceanbase.odc.service.schedule.model.DlmTableUnitStatistic; +import com.oceanbase.tools.dbbrowser.model.DBObjectType; import com.oceanbase.tools.migrator.common.configure.DataSourceInfo; import com.oceanbase.tools.migrator.common.enums.JobType; +import com.oceanbase.tools.migrator.limiter.LimiterConfig; import lombok.Data; @@ -48,6 +51,10 @@ public class DlmTableUnit { private DataSourceInfo targetDatasourceInfo; + private LimiterConfig sourceLimitConfig; + + private LimiterConfig targetLimitConfig; + private DlmTableUnitStatistic statistic; private DlmTableUnitParameters parameters; @@ -60,4 +67,6 @@ public class DlmTableUnit { private Date endTime; + private Set syncTableStructure; + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/utils/TaskGeneratorMapper.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/utils/TaskGeneratorMapper.java index 9c61909e5a..8e124d27e7 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/utils/TaskGeneratorMapper.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/utils/TaskGeneratorMapper.java @@ -36,16 +36,15 @@ public interface TaskGeneratorMapper { @Mapping(source = "generatorId", target = "id") @Mapping(source = "status", target = "generatorStatus") - @Mapping(target = "generatorType", constant = "AUTO") - @Mapping(source = "partitionSavePoint", target = "generatorPartitionSavepoint") - @Mapping(target = "generatorSavePoint", + @Mapping(source = "partitionSavePoint", target = "partitionSavePoint") + @Mapping(target = "primaryKeySavePoint", expression = "java(com.oceanbase.tools.migrator.common.element.PrimaryKey.valuesOf(entity.getPrimaryKeySavePoint()))") TaskGenerator entityToModel(TaskGeneratorEntity entity); @InheritInverseConfiguration @Mapping(target = "type", constant = "AUTO") @Mapping(target = "primaryKeySavePoint", - expression = "java(model.getGeneratorSavePoint() != null ?model.getGeneratorSavePoint().toSqlString():null)") + expression = "java(model.getPrimaryKeySavePoint() != null ?model.getPrimaryKeySavePoint().toSqlString():null)") @Mapping(target = "id", ignore = true) TaskGeneratorEntity modelToEntity(TaskGenerator model); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/processor/CreateFlowInstanceProcessAspect.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/processor/CreateFlowInstanceProcessAspect.java index 0fa3896d45..256fef7270 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/processor/CreateFlowInstanceProcessAspect.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/processor/CreateFlowInstanceProcessAspect.java @@ -44,9 +44,6 @@ import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.partitionplan.model.PartitionPlanConfig; import com.oceanbase.odc.service.quartz.util.QuartzCronExpressionUtils; -import com.oceanbase.odc.service.schedule.flowtask.AlterScheduleParameters; -import com.oceanbase.odc.service.schedule.model.JobType; -import com.oceanbase.odc.service.schedule.model.OperationType; import com.oceanbase.odc.service.schedule.model.TriggerConfig; import com.oceanbase.odc.service.schedule.model.TriggerStrategy; @@ -74,8 +71,6 @@ public class CreateFlowInstanceProcessAspect implements InitializingBean { @Value("${odc.task.trigger.minimum-interval:600}") private Long triggerMinimumIntervalSeconds; - private final Map scheduleTaskPreprocessors = new HashMap<>(); - private final Map flowTaskPreprocessors = new HashMap<>(); @Pointcut("@annotation(com.oceanbase.odc.service.flow.processor.EnablePreprocess) && args(com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq)") @@ -148,14 +143,6 @@ private void validateTriggerConfig(CreateFlowInstanceReq req) { if (parameters.getDroppingTrigger() != null) { validateTriggerConfig(parameters.getDroppingTrigger()); } - return; - } - if (req.getParameters() instanceof AlterScheduleParameters) { - AlterScheduleParameters parameters = (AlterScheduleParameters) req.getParameters(); - if (parameters.getOperationType() == OperationType.CREATE - || parameters.getOperationType() == OperationType.UPDATE) { - validateTriggerConfig(parameters.getTriggerConfig()); - } } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/CloudObjectStorageClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/CloudObjectStorageClient.java index 02898febbf..48724627a0 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/CloudObjectStorageClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/CloudObjectStorageClient.java @@ -302,13 +302,13 @@ public long calculatePartSize(long fileLength) { * 也就是杭州的client只允许操作杭州的bucket,不允许跨域操作 */ private void validateBucket() { - if (objectStorageConfiguration.getCloudProvider() != CloudProvider.ALIBABA_CLOUD) { - return; - } String bucketName = getBucketName(); boolean isExist = publicEndpointCloudObjectStorage.doesBucketExist(bucketName); Verify.verify(isExist, String.format("object storage bucket '%s' not exists", bucketName)); + if (objectStorageConfiguration.getCloudProvider() != CloudProvider.ALIBABA_CLOUD) { + return; + } String region = objectStorageConfiguration.getRegion(); if (StringUtils.isNotEmpty(region)) { String location = publicEndpointCloudObjectStorage.getBucketLocation(bucketName); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/CloudObjectStorage.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/CloudObjectStorage.java index 60d75792f4..617c40d956 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/CloudObjectStorage.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/CloudObjectStorage.java @@ -16,6 +16,7 @@ package com.oceanbase.odc.service.objectstorage.cloud; import java.io.File; +import java.io.InputStream; import java.net.URL; import java.util.Date; import java.util.List; @@ -61,6 +62,9 @@ public interface CloudObjectStorage { PutObjectResult putObject(String bucketName, String key, File file, ObjectMetadata metadata) throws CloudException; + PutObjectResult putObject(String bucketName, String key, InputStream in, ObjectMetadata metadata) + throws CloudException; + default PutObjectResult putObject(String bucketName, String key, File file) { return putObject(bucketName, key, file, null); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/AlibabaCloudClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/AlibabaCloudClient.java index 059a7ba14e..acd9c43aea 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/AlibabaCloudClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/AlibabaCloudClient.java @@ -170,6 +170,23 @@ public PutObjectResult putObject(String bucketName, String key, File file, Objec return putObject; } + @Override + public PutObjectResult putObject(String bucketName, String key, InputStream in, ObjectMetadata metadata) + throws CloudException { + PutObjectResult putObject = callOssMethod("Put object", () -> { + com.aliyun.oss.model.ObjectMetadata objectMetadata = toOss(metadata); + com.aliyun.oss.model.PutObjectResult ossResult = oss.putObject(bucketName, key, in, objectMetadata); + PutObjectResult result = new PutObjectResult(); + result.setVersionId(ossResult.getVersionId()); + result.setETag(ossResult.getETag()); + result.setRequestId(ossResult.getRequestId()); + result.setClientCRC(ossResult.getClientCRC()); + result.setServerCRC(ossResult.getServerCRC()); + return result; + }); + return putObject; + } + @Override public CopyObjectResult copyObject(String bucketName, String from, String to) throws CloudException { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/AmazonCloudClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/AmazonCloudClient.java index e8e80d226b..d5f6527a0f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/AmazonCloudClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/AmazonCloudClient.java @@ -183,6 +183,23 @@ public PutObjectResult putObject(String bucketName, String key, File file, Objec }); } + @Override + public PutObjectResult putObject(String bucketName, String key, InputStream in, ObjectMetadata metadata) + throws CloudException { + return callAmazonMethod("Put object", () -> { + com.amazonaws.services.s3.model.ObjectMetadata objectMetadata = toS3(metadata); + PutObjectRequest putRequest = new PutObjectRequest(bucketName, key, in, objectMetadata); + if (metadata.hasTag()) { + putRequest.withTagging(toS3(metadata.getTagging())); + } + com.amazonaws.services.s3.model.PutObjectResult s3Result = s3.putObject(putRequest); + PutObjectResult result = new PutObjectResult(); + result.setVersionId(s3Result.getVersionId()); + result.setETag(s3Result.getETag()); + return result; + }); + } + @Override public CopyObjectResult copyObject(String bucketName, String from, String to) throws CloudException { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/NullCloudClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/NullCloudClient.java index 7accc9b39b..91dec37407 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/NullCloudClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/client/NullCloudClient.java @@ -16,6 +16,7 @@ package com.oceanbase.odc.service.objectstorage.cloud.client; import java.io.File; +import java.io.InputStream; import java.net.URL; import java.util.Date; import java.util.List; @@ -77,6 +78,12 @@ public PutObjectResult putObject(String bucketName, String key, File file, Objec throw new UnsupportedException(); } + @Override + public PutObjectResult putObject(String bucketName, String key, InputStream in, ObjectMetadata metadata) + throws CloudException { + throw new UnsupportedException(); + } + @Override public CopyObjectResult copyObject(String bucketName, String from, String to) throws CloudException { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java index 661574d65c..d37fb4ecef 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java @@ -27,7 +27,10 @@ import org.quartz.JobKey; import org.quartz.UnableToInterruptJobException; +import com.oceanbase.odc.core.shared.constant.TaskStatus; import com.oceanbase.odc.core.shared.exception.UnsupportedException; +import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity; +import com.oceanbase.odc.metadb.schedule.ScheduleTaskRepository; import com.oceanbase.odc.service.common.util.SpringContextUtil; import com.oceanbase.odc.service.monitor.DefaultMeterName; import com.oceanbase.odc.service.monitor.MeterKey; @@ -75,6 +78,16 @@ public void execute(JobExecutionContext context) throws JobExecutionException { sendEndMetric(); } catch (Exception e) { sendFailedMetric(); + try { + log.info("Start to update schedule task status to failed,jobKey={}", jobKey); + ScheduleTaskRepository taskRepository = SpringContextUtil.getBean(ScheduleTaskRepository.class); + ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult(); + if (taskEntity != null && taskEntity.getId() != null) { + taskRepository.updateStatusById(taskEntity.getId(), TaskStatus.FAILED); + } + } catch (Exception innerException) { + log.warn("Update schedule task status failed.", innerException); + } log.warn("Job execute failed,job key={},fire time={}.", context.getJobDetail().getKey(), context.getFireTime(), e); } finally { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java index e6503004bb..cab45c4c98 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java @@ -42,6 +42,7 @@ import org.quartz.Trigger; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.cglib.beans.BeanMap; import org.springframework.context.annotation.Lazy; import org.springframework.core.io.InputStreamResource; @@ -151,6 +152,9 @@ @Service @SkipAuthorize public class ScheduleService { + + @Value("${odc.task.trigger.minimum-interval:600}") + private int minInterval; @Autowired private ScheduleRepository scheduleRepository; @@ -237,6 +241,7 @@ public List dispatchCreateSchedule(CreateFlowInstanceReq ScheduleChangeParams scheduleChangeParams; switch (parameters.getOperationType()) { case CREATE: { + validateTriggerConfig(parameters.getTriggerConfig()); CreateScheduleReq createScheduleReq = new CreateScheduleReq(); createScheduleReq.setParameters(parameters.getScheduleTaskParameters()); createScheduleReq.setTriggerConfig(parameters.getTriggerConfig()); @@ -274,7 +279,6 @@ public ChangeScheduleResp changeSchedule(ScheduleChangeParams req) { // create or load target schedule if (req.getOperationType() == OperationType.CREATE) { PreConditions.notNull(req.getCreateScheduleReq(), "req.createScheduleReq"); - validateTriggerConfig(req.getCreateScheduleReq().getTriggerConfig()); ScheduleEntity entity = new ScheduleEntity(); entity.setName(req.getCreateScheduleReq().getName()); @@ -420,10 +424,8 @@ private void validateTriggerConfig(TriggerConfig triggerConfig) { throw new IllegalArgumentException("Invalid cron expression"); } long intervalMills = nextFiveFireTimes.get(1).getTime() - nextFiveFireTimes.get(0).getTime(); - if (intervalMills / 1000 < 10 * 60) { - throw new IllegalArgumentException( - "The interval between weeks is too short. The minimum interval is 10 minutes."); - } + PreConditions.validArgumentState(intervalMills / 1000 > minInterval, ErrorCodes.ScheduleIntervalTooShort, + new Object[] {minInterval}, null); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java index e3216a388e..98c202020e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import org.quartz.JobExecutionContext; @@ -29,12 +30,14 @@ import com.oceanbase.odc.service.connection.database.DatabaseService; import com.oceanbase.odc.service.connection.database.model.Database; import com.oceanbase.odc.service.connection.model.ConnectionConfig; +import com.oceanbase.odc.service.dlm.DLMConfiguration; +import com.oceanbase.odc.service.dlm.DLMService; import com.oceanbase.odc.service.dlm.DataSourceInfoMapper; import com.oceanbase.odc.service.dlm.DlmLimiterService; +import com.oceanbase.odc.service.dlm.model.DlmTableUnit; import com.oceanbase.odc.service.quartz.util.ScheduleTaskUtils; import com.oceanbase.odc.service.schedule.ScheduleService; import com.oceanbase.odc.service.task.base.dataarchive.DataArchiveTask; -import com.oceanbase.odc.service.task.config.TaskFrameworkEnabledProperties; import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants; import com.oceanbase.odc.service.task.executor.task.TaskDescription; import com.oceanbase.odc.service.task.schedule.DefaultJobDefinition; @@ -58,24 +61,23 @@ public abstract class AbstractDlmJob implements OdcJob { public final DatabaseService databaseService; public final ScheduleService scheduleService; public final DlmLimiterService limiterService; + public final DLMService dlmService; public JobScheduler jobScheduler = null; - - public final TaskFrameworkEnabledProperties taskFrameworkProperties; - public final TaskFrameworkService taskFrameworkService; + public final DLMConfiguration dlmConfiguration; + public AbstractDlmJob() { scheduleTaskRepository = SpringContextUtil.getBean(ScheduleTaskRepository.class); databaseService = SpringContextUtil.getBean(DatabaseService.class); scheduleService = SpringContextUtil.getBean(ScheduleService.class); limiterService = SpringContextUtil.getBean(DlmLimiterService.class); - taskFrameworkProperties = SpringContextUtil.getBean(TaskFrameworkEnabledProperties.class); taskFrameworkService = SpringContextUtil.getBean(TaskFrameworkService.class); - if (taskFrameworkProperties.isEnabled()) { - jobScheduler = SpringContextUtil.getBean(JobScheduler.class); - } + jobScheduler = SpringContextUtil.getBean(JobScheduler.class); + dlmService = SpringContextUtil.getBean(DLMService.class); + dlmConfiguration = SpringContextUtil.getBean(DLMConfiguration.class); } public DataSourceInfo getDataSourceInfo(Long databaseId) { @@ -83,6 +85,8 @@ public DataSourceInfo getDataSourceInfo(Long databaseId) { ConnectionConfig config = databaseService.findDataSourceForTaskById(databaseId); DataSourceInfo dataSourceInfo = DataSourceInfoMapper.toDataSourceInfo(config, db.getName()); dataSourceInfo.setDatabaseName(db.getName()); + dataSourceInfo.setSessionLimitRatio(dlmConfiguration.getSessionLimitingRatio()); + dataSourceInfo.setEnabledLimit(dlmConfiguration.isSessionLimitingEnabled()); return dataSourceInfo; } @@ -129,10 +133,25 @@ public Long publishJob(DLMJobReq parameters, Long timeoutMillis, CloudProvider p } public DLMJobReq getDLMJobReq(Long jobId) { - return JsonUtils.fromJson(JsonUtils.fromJson( + DLMJobReq dlmJobReq = JsonUtils.fromJson(JsonUtils.fromJson( taskFrameworkService.find(jobId).getJobParametersJson(), new TypeReference>() {}).get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON), DLMJobReq.class); + Map tableName2Unit = + dlmService.findByScheduleTaskId(dlmJobReq.getScheduleTaskId()).stream() + .collect( + Collectors.toMap(DlmTableUnit::getTableName, o -> o)); + dlmJobReq.getTables().forEach(o -> { + if (tableName2Unit.containsKey(o.getTableName()) + && tableName2Unit.get(o.getTableName()).getStatistic() != null) { + o.setPartName2MinKey(tableName2Unit.get(o.getTableName()).getStatistic().getPartName2MinKey()); + o.setPartName2MaxKey(tableName2Unit.get(o.getTableName()).getStatistic().getPartName2MaxKey()); + o.setMinKey(tableName2Unit.get(o.getTableName()).getStatistic().getGlobalMinKey()); + o.setMaxKey(tableName2Unit.get(o.getTableName()).getStatistic().getGlobalMaxKey()); + } + }); + + return dlmJobReq; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java index be4bd69f4f..c0be02423a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java @@ -65,6 +65,7 @@ public void executeJob(JobExecutionContext context) { DLMJobReq parameters = getDLMJobReq(dataArchiveTask.getJobId()); parameters.setJobType(JobType.DELETE); + parameters.setFireTime(context.getFireTime()); parameters.setScheduleTaskId(taskEntity.getId()); parameters .setRateLimit(limiterService.getByOrderIdOrElseDefaultConfig(Long.parseLong(taskEntity.getJobName()))); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java index ef6b448532..6534f436aa 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java @@ -48,6 +48,7 @@ private void executeInTaskFramework(JobExecutionContext context) { parameters.setScheduleTaskId(taskEntity.getId()); parameters.setJobType(JobType.MIGRATE); parameters.setTables(dataArchiveParameters.getTables()); + parameters.setFireTime(context.getFireTime()); for (DataArchiveTableConfig tableConfig : parameters.getTables()) { tableConfig.setConditionExpression(StringUtils.isNotEmpty(tableConfig.getConditionExpression()) ? DataArchiveConditionUtil.parseCondition(tableConfig.getConditionExpression(), diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java index 67ef30c5d8..dc139884da 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java @@ -63,6 +63,7 @@ public void executeJob(JobExecutionContext context) { DataSourceInfo tempDataSource = parameters.getSourceDs(); parameters.setSourceDs(parameters.getTargetDs()); parameters.setTargetDs(tempDataSource); + parameters.setFireTime(context.getFireTime()); parameters .setRateLimit(limiterService.getByOrderIdOrElseDefaultConfig(Long.parseLong(taskEntity.getJobName()))); parameters.getTables().forEach(o -> { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java index 392abd3f72..00e2a60fc6 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java @@ -72,6 +72,7 @@ private void executeInTaskFramework(JobExecutionContext context) { parameters.getSourceDs().setQueryTimeout(dataDeleteParameters.getQueryTimeout()); parameters.getTargetDs().setQueryTimeout(dataDeleteParameters.getQueryTimeout()); parameters.setShardingStrategy(dataDeleteParameters.getShardingStrategy()); + parameters.setFireTime(context.getFireTime()); Long jobId = publishJob(parameters, dataDeleteParameters.getTimeoutMillis(), dataDeleteParameters.getDatabaseId()); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/DlmTableUnitStatistic.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/DlmTableUnitStatistic.java index 063accdf22..f6aa86192e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/DlmTableUnitStatistic.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/DlmTableUnitStatistic.java @@ -15,6 +15,9 @@ */ package com.oceanbase.odc.service.schedule.model; +import java.util.HashMap; +import java.util.Map; + import lombok.Data; /** @@ -33,4 +36,12 @@ public class DlmTableUnitStatistic { private Long readRowsPerSecond = 0L; + private String globalMinKey; + + private String globalMaxKey; + + private Map partName2MinKey = new HashMap<>(); + + private Map partName2MaxKey = new HashMap<>(); + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/AbstractDlmPreprocessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/AbstractDlmPreprocessor.java index b7bd32427e..352f267210 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/AbstractDlmPreprocessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/AbstractDlmPreprocessor.java @@ -181,7 +181,7 @@ public void supportDataArchivingLink(ConnectionConfig sourceDs, ConnectionConfig targetDs.getRegion())); } if (sourceDs.getDialectType().isMysql()) { - if (!targetDs.getDialectType().isMysql()) { + if (!targetDs.getDialectType().isMysql() && targetDs.getDialectType() != DialectType.FILE_SYSTEM) { throw new UnsupportedException( String.format("Unsupported data link from %s to %s.", sourceDs.getDialectType(), targetDs.getDialectType())); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/DataArchivePreprocessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/DataArchivePreprocessor.java index 63c3b30f67..dad948983c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/DataArchivePreprocessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/DataArchivePreprocessor.java @@ -18,18 +18,14 @@ import org.springframework.beans.factory.annotation.Autowired; import com.oceanbase.odc.core.session.ConnectionSession; -import com.oceanbase.odc.core.session.ConnectionSessionConstants; import com.oceanbase.odc.core.session.ConnectionSessionFactory; -import com.oceanbase.odc.core.shared.constant.DialectType; -import com.oceanbase.odc.core.shared.exception.UnsupportedException; -import com.oceanbase.odc.plugin.connect.api.InformationExtensionPoint; +import com.oceanbase.odc.core.shared.PreConditions; +import com.oceanbase.odc.core.shared.constant.ErrorCodes; import com.oceanbase.odc.service.connection.database.DatabaseService; import com.oceanbase.odc.service.connection.database.model.Database; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.dlm.DLMConfiguration; -import com.oceanbase.odc.service.dlm.DLMTableStructureSynchronizer; import com.oceanbase.odc.service.dlm.model.DataArchiveParameters; -import com.oceanbase.odc.service.plugin.ConnectionPluginUtil; import com.oceanbase.odc.service.schedule.model.OperationType; import com.oceanbase.odc.service.schedule.model.ScheduleChangeParams; import com.oceanbase.odc.service.schedule.model.ScheduleType; @@ -65,12 +61,15 @@ public void process(ScheduleChangeParams req) { Database sourceDb = databaseService.detail(parameters.getSourceDatabaseId()); Database targetDb = databaseService.detail(parameters.getTargetDataBaseId()); supportDataArchivingLink(sourceDb.getDataSource(), targetDb.getDataSource()); + if (!parameters.getSyncTableStructure().isEmpty()) { + PreConditions.validArgumentState(sourceDb.getDialectType() != targetDb.getDialectType(), + ErrorCodes.UnsupportedSyncTableStructure, + new Object[] {sourceDb.getDialectType(), targetDb.getDialectType()}, null); + } ConnectionConfig sourceDs = sourceDb.getDataSource(); sourceDs.setDefaultSchema(sourceDb.getName()); ConnectionSessionFactory sourceSessionFactory = new DefaultConnectSessionFactory(sourceDs); - ConnectionSessionFactory targetSessionFactory = new DefaultConnectSessionFactory(targetDb.getDataSource()); ConnectionSession sourceSession = sourceSessionFactory.generateSession(); - ConnectionSession targetSession = targetSessionFactory.generateSession(); try { if (parameters.isFullDatabase()) { parameters.setTables(getAllTables(sourceSession, sourceDb.getName())); @@ -78,36 +77,9 @@ public void process(ScheduleChangeParams req) { if (parameters.getTables().isEmpty()) { throw new IllegalArgumentException("The table list is empty."); } - DialectType sourceDbType = sourceSession.getDialectType(); - DialectType targetDbType = targetSession.getDialectType(); - InformationExtensionPoint sourceInformation = - ConnectionPluginUtil.getInformationExtension(sourceDbType); - InformationExtensionPoint targetInformation = - ConnectionPluginUtil.getInformationExtension(targetDbType); - String sourceDbVersion = - sourceSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY).execute( - sourceInformation::getDBVersion); - String targetDbVersion = - targetSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY).execute( - targetInformation::getDBVersion); - if (!parameters.getSyncTableStructure().isEmpty()) { - boolean supportedSyncTableStructure = DLMTableStructureSynchronizer.isSupportedSyncTableStructure( - sourceDbType, sourceDbVersion, targetDbType, targetDbVersion); - if (!supportedSyncTableStructure) { - log.warn( - "Synchronization of table structure is unsupported,sourceDbType={},sourceDbVersion={},targetDbType={},targetDbVersion={}", - sourceDbType, - sourceDbVersion, targetDbType, targetDbVersion); - throw new UnsupportedException(String.format( - "Synchronization of table structure is unsupported,sourceDbType=%s,sourceDbVersion=%s,targetDbType=%s,targetDbVersion=%s", - sourceDbType, - sourceDbVersion, targetDbType, targetDbVersion)); - } - } checkTableAndCondition(sourceSession, sourceDb, parameters.getTables(), parameters.getVariables()); } finally { sourceSession.expire(); - targetSession.expire(); } log.info("Data archive preprocessing has been completed."); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/base/dataarchive/DataArchiveTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/base/dataarchive/DataArchiveTask.java index 6c7d63ec1d..1b934ab46f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/base/dataarchive/DataArchiveTask.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/base/dataarchive/DataArchiveTask.java @@ -15,13 +15,11 @@ */ package com.oceanbase.odc.service.task.base.dataarchive; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import com.oceanbase.odc.common.json.JsonUtils; @@ -29,9 +27,9 @@ import com.oceanbase.odc.service.dlm.DLMJobFactory; import com.oceanbase.odc.service.dlm.DLMJobStore; import com.oceanbase.odc.service.dlm.DLMTableStructureSynchronizer; -import com.oceanbase.odc.service.dlm.DataSourceInfoMapper; import com.oceanbase.odc.service.dlm.model.DlmTableUnit; import com.oceanbase.odc.service.dlm.model.DlmTableUnitParameters; +import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration; import com.oceanbase.odc.service.dlm.utils.DlmJobIdUtil; import com.oceanbase.odc.service.schedule.job.DLMJobReq; import com.oceanbase.odc.service.schedule.model.DlmTableUnitStatistic; @@ -40,7 +38,9 @@ import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants; import com.oceanbase.odc.service.task.util.JobUtils; import com.oceanbase.tools.migrator.common.enums.JobType; +import com.oceanbase.tools.migrator.core.meta.JobMeta; import com.oceanbase.tools.migrator.job.Job; +import com.oceanbase.tools.migrator.limiter.LimiterConfig; import com.oceanbase.tools.migrator.task.CheckMode; import lombok.extern.slf4j.Slf4j; @@ -58,7 +58,8 @@ public class DataArchiveTask extends TaskBase> { private DLMJobStore jobStore; private double progress = 0.0; private Job job; - private Map result; + private List toDoList; + private int currentIndex = 0; private boolean isToStop = false; public DataArchiveTask() {} @@ -67,97 +68,71 @@ public DataArchiveTask() {} protected void doInit(JobContext context) { jobStore = new DLMJobStore(JobUtils.getMetaDBConnectionConfig()); jobFactory = new DLMJobFactory(jobStore); - log.info("Init data-archive job env succeed,jobIdentity={}", context.getJobIdentity()); - } - - @Override - public boolean start() throws Exception { - - jobStore.setJobParameters(jobContext.getJobParameters()); - DLMJobReq parameters = - JsonUtils.fromJson( - jobContext.getJobParameters().get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON), - DLMJobReq.class); - if (parameters.getFireTime() == null) { - parameters.setFireTime(new Date()); - } try { - result = getDlmTableUnits(parameters).stream() - .collect(Collectors.toMap(DlmTableUnit::getDlmTableUnitId, o -> o)); - jobStore.setDlmTableUnits(result); + DLMJobReq parameters = + JsonUtils.fromJson( + jobContext.getJobParameters().get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON), + DLMJobReq.class); + initTableUnit(parameters); } catch (Exception e) { - log.warn("Get dlm job failed!", e); - context.getExceptionListener().onException(e); - return false; + log.warn("Initialization of the DLM job was failed,jobIdentity={}", context.getJobIdentity(), e); } - Set dlmTableUnitIds = result.keySet(); + log.info("Initialization of the DLM job was successful. Number of tables to be processed = {},jobIdentity={}", + toDoList.size(), context.getJobIdentity()); + } - for (String dlmTableUnitId : dlmTableUnitIds) { - DlmTableUnit dlmTableUnit = result.get(dlmTableUnitId); - if (isToStop) { - log.info("Job is terminated,jobIdentity={}", jobContext.getJobIdentity()); - break; - } + @Override + public boolean start() throws Exception { + while (!isToStop && currentIndex < toDoList.size()) { + DlmTableUnit dlmTableUnit = toDoList.get(currentIndex); if (dlmTableUnit.getStatus() == TaskStatus.DONE) { log.info("The table had been completed,tableName={}", dlmTableUnit.getTableName()); + currentIndex++; continue; } - startTableUnit(dlmTableUnitId); - if (parameters.getJobType() == JobType.MIGRATE) { - try { - DLMTableStructureSynchronizer.sync( - DataSourceInfoMapper.toConnectionConfig(parameters.getSourceDs()), - DataSourceInfoMapper.toConnectionConfig(parameters.getTargetDs()), - dlmTableUnit.getTableName(), dlmTableUnit.getTargetTableName(), - parameters.getSyncTableStructure()); - } catch (Exception e) { - log.warn("Failed to sync target table structure,table will be ignored,tableName={}", - dlmTableUnit.getTableName(), e); - if (!parameters.getSyncTableStructure().isEmpty()) { - finishTableUnit(dlmTableUnitId, TaskStatus.FAILED); - continue; - } - } - } + syncTableStructure(dlmTableUnit); try { + jobStore.setDlmTableUnit(dlmTableUnit); job = jobFactory.createJob(dlmTableUnit); - log.info("Init {} job succeed,DLMJobId={}", job.getJobMeta().getJobType(), job.getJobMeta().getJobId()); - log.info("{} job start,DLMJobId={}", job.getJobMeta().getJobType(), job.getJobMeta().getJobId()); - if (isToStop) { - finishTableUnit(dlmTableUnitId, TaskStatus.CANCELED); - job.stop(); - log.info("The task has stopped."); - break; - } else { - job.run(); - } - log.info("{} job finished,DLMJobId={}", dlmTableUnit.getType(), dlmTableUnitId); - finishTableUnit(dlmTableUnitId, TaskStatus.DONE); } catch (Throwable e) { - log.error("{} job failed,DLMJobId={},errorMsg={}", dlmTableUnit.getType(), dlmTableUnitId, e); - // set task status to failed if any job failed. - if (job != null && job.getJobMeta().isToStop()) { - finishTableUnit(dlmTableUnitId, TaskStatus.CANCELED); - } else { - finishTableUnit(dlmTableUnitId, TaskStatus.FAILED); - context.getExceptionListener().onException(e); - } + log.error("Failed to create job,dlmTableUnitId={}", dlmTableUnit.getDlmTableUnitId(), e); + dlmTableUnit.setStatus(isToStop ? TaskStatus.CANCELED : TaskStatus.FAILED); + currentIndex++; + continue; } + log.info("Init {} job succeed,dlmTableUnitId={}", dlmTableUnit.getType(), dlmTableUnit.getDlmTableUnitId()); + try { + dlmTableUnit.setStatus(TaskStatus.RUNNING); + dlmTableUnit.setStartTime(new Date()); + job.run(); + log.info("{} job finished,dlmTableUnitId={}", dlmTableUnit.getType(), dlmTableUnit.getDlmTableUnitId()); + dlmTableUnit.setStatus(TaskStatus.DONE); + } catch (Throwable e) { + dlmTableUnit.setStatus(isToStop ? TaskStatus.CANCELED : TaskStatus.FAILED); + context.getExceptionListener().onException(e); + } + dlmTableUnit.setEndTime(new Date()); + currentIndex++; } + log.info("All tables have been processed,jobIdentity={}.\n{}", jobContext.getJobIdentity(), buildReport()); return true; } - private void startTableUnit(String dlmTableUnitId) { - result.get(dlmTableUnitId).setStatus(TaskStatus.RUNNING); - result.get(dlmTableUnitId).setStartTime(new Date()); - } - - private void finishTableUnit(String dlmTableUnitId, TaskStatus status) { - result.get(dlmTableUnitId).setStatus(status); - result.get(dlmTableUnitId).setEndTime(new Date()); + private void syncTableStructure(DlmTableUnit tableUnit) { + if (tableUnit.getType() != JobType.MIGRATE) { + return; + } + try { + DLMTableStructureSynchronizer.sync(tableUnit.getSourceDatasourceInfo(), tableUnit.getTargetDatasourceInfo(), + tableUnit.getTableName(), tableUnit.getTargetTableName(), + tableUnit.getSyncTableStructure()); + } catch (Exception e) { + log.warn("Failed to sync target table structure,tableName={}", + tableUnit.getTableName(), e); + } } - private List getDlmTableUnits(DLMJobReq req) throws SQLException { + private void initTableUnit(DLMJobReq req) { List dlmTableUnits = new LinkedList<>(); req.getTables().forEach(table -> { DlmTableUnit dlmTableUnit = new DlmTableUnit(); @@ -171,6 +146,8 @@ private List getDlmTableUnits(DLMJobReq req) throws SQLException { jobParameter.setMigratePartitions(table.getPartitions()); jobParameter.setSyncDBObjectType(req.getSyncTableStructure()); jobParameter.setShardingStrategy(req.getShardingStrategy()); + jobParameter.setPartName2MinKey(table.getPartName2MinKey()); + jobParameter.setPartName2MaxKey(table.getPartName2MaxKey()); dlmTableUnit.setParameters(jobParameter); dlmTableUnit.setDlmTableUnitId(DlmJobIdUtil.generateHistoryJobId(req.getJobName(), req.getJobType().name(), req.getScheduleTaskId(), dlmTableUnits.size())); @@ -182,9 +159,37 @@ private List getDlmTableUnits(DLMJobReq req) throws SQLException { dlmTableUnit.setStatus(TaskStatus.PREPARING); dlmTableUnit.setType(req.getJobType()); dlmTableUnit.setStatistic(new DlmTableUnitStatistic()); + dlmTableUnit.setSyncTableStructure(req.getSyncTableStructure()); + LimiterConfig limiterConfig = new LimiterConfig(); + limiterConfig.setDataSizeLimit(req.getRateLimit().getDataSizeLimit()); + limiterConfig.setRowLimit(req.getRateLimit().getRowLimit()); + dlmTableUnit.setSourceLimitConfig(limiterConfig); + dlmTableUnit.setTargetLimitConfig(limiterConfig); dlmTableUnits.add(dlmTableUnit); }); - return dlmTableUnits; + toDoList = new LinkedList<>(dlmTableUnits); + } + + private String buildReport() { + StringBuilder sb = new StringBuilder(); + sb.append("Job report:\n"); + sb.append("Total tables: ").append(toDoList.size()).append("\n"); + sb.append("Success tables: ") + .append(toDoList.stream().filter(t -> t.getStatus() == TaskStatus.DONE).map(DlmTableUnit::getTableName) + .collect( + Collectors.joining(","))) + .append("\n"); + sb.append("Failed tables: ") + .append(toDoList.stream().filter(t -> t.getStatus() == TaskStatus.FAILED) + .map(DlmTableUnit::getTableName).collect( + Collectors.joining(","))) + .append("\n"); + sb.append("Canceled tables: ") + .append(toDoList.stream().filter(t -> t.getStatus() == TaskStatus.CANCELED) + .map(DlmTableUnit::getTableName).collect( + Collectors.joining(","))) + .append("\n"); + return sb.toString(); } @Override @@ -193,9 +198,9 @@ public void stop() throws Exception { if (job != null) { try { job.stop(); - result.forEach((k, v) -> { - if (!v.getStatus().isTerminated()) { - v.setStatus(TaskStatus.CANCELED); + toDoList.forEach(t -> { + if (!t.getStatus().isTerminated()) { + t.setStatus(TaskStatus.CANCELED); } }); } catch (Exception e) { @@ -214,13 +219,39 @@ public boolean modify(Map jobParameters) { if (!super.modify(jobParameters)) { return false; } - afterModifiedJobParameters(); + updateLimiter(jobParameters); return true; } - protected void afterModifiedJobParameters() { - if (jobStore != null) { - jobStore.setJobParameters(jobContext.getJobParameters()); + public void updateLimiter(Map jobParameters) { + if (job == null || job.getJobMeta() == null) { + return; + } + JobMeta jobMeta = job.getJobMeta(); + try { + RateLimitConfiguration params; + if (jobParameters.containsKey(JobParametersKeyConstants.DLM_RATE_LIMIT_CONFIG)) { + params = JsonUtils.fromJson( + jobParameters.get(JobParametersKeyConstants.DLM_RATE_LIMIT_CONFIG), + RateLimitConfiguration.class); + } else { + DLMJobReq dlmJobReq = JsonUtils.fromJson( + jobParameters.get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON), + DLMJobReq.class); + params = dlmJobReq.getRateLimit(); + } + if (params.getDataSizeLimit() != null) { + jobMeta.getSourceLimiterConfig().setDataSizeLimit(params.getDataSizeLimit()); + jobMeta.getTargetLimiterConfig().setDataSizeLimit(params.getDataSizeLimit()); + log.info("Update rate limit success,dataSizeLimit={}", params.getDataSizeLimit()); + } + if (params.getRowLimit() != null) { + jobMeta.getSourceLimiterConfig().setRowLimit(params.getRowLimit()); + jobMeta.getTargetLimiterConfig().setRowLimit(params.getRowLimit()); + log.info("Update rate limit success,rowLimit={}", params.getRowLimit()); + } + } catch (Exception e) { + log.warn("Update rate limit failed,errorMsg={}", e.getMessage()); } } @@ -231,6 +262,6 @@ public double getProgress() { @Override public List getTaskResult() { - return new ArrayList<>(result.values()); + return new ArrayList<>(toDoList); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/result/DLMResultProcessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/result/DLMResultProcessor.java index e61bd64b3f..50c428e1bb 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/result/DLMResultProcessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/result/DLMResultProcessor.java @@ -48,7 +48,7 @@ public class DLMResultProcessor extends DLMProcessorMatcher implements ResultPro @Override public void process(TaskResult result) { - log.info("Start refresh result,result={}", result.getResultJson()); + log.info("Start refresh result,jobIdentity={}", result.getJobIdentity()); try { List dlmTableUnits = JsonUtils.fromJson(result.getResultJson(), new TypeReference>() {}); diff --git a/server/plugins/connect-plugin-api/src/main/java/com/oceanbase/odc/plugin/connect/api/TestResult.java b/server/plugins/connect-plugin-api/src/main/java/com/oceanbase/odc/plugin/connect/api/TestResult.java index aa20507701..e2230ee08f 100644 --- a/server/plugins/connect-plugin-api/src/main/java/com/oceanbase/odc/plugin/connect/api/TestResult.java +++ b/server/plugins/connect-plugin-api/src/main/java/com/oceanbase/odc/plugin/connect/api/TestResult.java @@ -84,6 +84,22 @@ public static TestResult initScriptFailed(Throwable throwable) { return fail(ErrorCodes.ConnectionInitScriptFailed, new String[] {message}); } + public static TestResult bucketNotExist(String bucketName) { + return fail(ErrorCodes.BucketNotExist, new String[] {bucketName}); + } + + public static TestResult invalidAccessKeyId(String accessKeyId) { + return fail(ErrorCodes.InvalidAccessKeyId, new String[] {accessKeyId}); + } + + public static TestResult akAccessDenied(String accessKeyId) { + return fail(ErrorCodes.AccessDenied, new String[] {accessKeyId}); + } + + public static TestResult signatureDoesNotMatch(String accessKeyId) { + return fail(ErrorCodes.SignatureDoesNotMatch, new String[] {accessKeyId}); + } + public static TestResult success() { TestResult result = new TestResult(); result.setActive(true);