From aa1c822edcb0091e28a310eca5d66006e98710cf Mon Sep 17 00:00:00 2001 From: dingxiaobo Date: Wed, 26 Oct 2022 14:46:06 +0800 Subject: [PATCH] Support OceanBase 4.x and fix OceanBase writer. --- .../plugin/reader/oceanbasev10reader/util/ObVersion.java | 2 ++ .../oceanbasev10reader/util/PartitionSplitUtil.java | 3 ++- .../task/ConcurrentTableWriterTask.java | 9 +-------- .../java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java | 3 +++ 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java index 7bb6559e57..2fc414ce2c 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java @@ -16,6 +16,8 @@ public class ObVersion implements Comparable { private int patchNumber; public static final ObVersion V2276 = valueOf("2.2.76"); + public static final ObVersion V4000 = valueOf("4.0.0.0"); + private static final ObVersion DEFAULT_VERSION = valueOf(System.getProperty("defaultObVersion","3.2.3.0")); diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java index bc3d455c91..2929658ae1 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java @@ -157,7 +157,8 @@ public static PartInfo getObMySQLPartInfoBySQL(Configuration config, String tabl conn = DBUtil.getConnection(DataBaseType.OceanBase, jdbcUrl, username, password); ObVersion obVersion = ObReaderUtils.getObVersion(conn); - if (obVersion.compareTo(ObVersion.V2276) >= 0) { + if (obVersion.compareTo(ObVersion.V2276) >= 0 && + obVersion.compareTo(ObVersion.V4000) < 0) { allTable = "__all_table_v2"; } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java index e6b4a561bc..82b16923e0 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java @@ -76,7 +76,6 @@ public ConcurrentTableWriterTask(DataBaseType dataBaseType) { private Condition condition = lock.newCondition(); private long startTime; - private boolean isOb2 = false; private String obWriteMode = "update"; private boolean isOracleCompatibleMode = false; private String obUpdateColumns = null; @@ -130,12 +129,6 @@ public void init(Configuration config) { concurrentWriter = new ConcurrentTableWriter(config, connectInfo, writeRecordSql); allTaskInQueue = false; } - - String version = config.getString(Config.OB_VERSION); - int pIdx = version.lastIndexOf('.'); - if ((Float.valueOf(version.substring(0, pIdx)) >= 2.1f)) { - isOb2 = true; - } } private void initPartCalculator(ServerConnectInfo connectInfo) { @@ -313,7 +306,7 @@ private void addRecordToCache(final Record record) { LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record); } - if (partId == null && isOb2) { + if (partId == null) { LOG.debug("fail to calculate parition id, just put into the default buffer."); partId = Long.MAX_VALUE; } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java index 2392d1ca03..978c4566c2 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java @@ -380,6 +380,9 @@ private static synchronized Connection connect(DataBaseType dataBaseType, // unit ms prop.put("oracle.jdbc.ReadTimeout", socketTimeout); } + if (dataBaseType == DataBaseType.OceanBase) { + url = url.replace("jdbc:mysql:", "jdbc:oceanbase:"); + } return connect(dataBaseType, url, prop); }