From 5ba48bc126066722b6724da4b737d207889e760e Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Wed, 12 May 2021 10:08:54 +0800 Subject: [PATCH] add arguemnt: deduplicate (#2073) Co-authored-by: marsishandsome --- .../tispark/write/TiBatchWriteTable.scala | 8 +++++ .../pingcap/tispark/write/TiDBOptions.scala | 2 ++ .../tispark/BatchWriteIssueSuite.scala | 30 +++++++++++++++++++ docs/datasource_api_userguide.md | 1 + 4 files changed, 41 insertions(+) diff --git a/core/src/main/scala/com/pingcap/tispark/write/TiBatchWriteTable.scala b/core/src/main/scala/com/pingcap/tispark/write/TiBatchWriteTable.scala index 153ab1602b..725ee96ca7 100644 --- a/core/src/main/scala/com/pingcap/tispark/write/TiBatchWriteTable.scala +++ b/core/src/main/scala/com/pingcap/tispark/write/TiBatchWriteTable.scala @@ -249,6 +249,14 @@ class TiBatchWriteTable( val distinctWrappedRowRdd = deduplicate(wrappedRowRdd) + if (!options.deduplicate) { + val c1 = wrappedRowRdd.count() + val c2 = distinctWrappedRowRdd.count() + if (c1 != c2) { + throw new TiBatchWriteException("duplicate unique key or primary key") + } + } + val deletion = (if (options.useSnapshotBatchGet) { generateDataToBeRemovedRddV2(distinctWrappedRowRdd, startTimeStamp) } else { diff --git a/core/src/main/scala/com/pingcap/tispark/write/TiDBOptions.scala b/core/src/main/scala/com/pingcap/tispark/write/TiDBOptions.scala index 79d37821b9..1c75543f5f 100644 --- a/core/src/main/scala/com/pingcap/tispark/write/TiDBOptions.scala +++ b/core/src/main/scala/com/pingcap/tispark/write/TiDBOptions.scala @@ -88,6 +88,7 @@ class TiDBOptions(@transient val parameters: CaseInsensitiveMap[String]) extends getOrDefault(TIDB_COMMIT_PRIMARY_KEY_RETRY_NUMBER, "4").toInt val enableUpdateTableStatistics: Boolean = getOrDefault(TIDB_ENABLE_UPDATE_TABLE_STATISTICS, "false").toBoolean + val deduplicate: Boolean = getOrDefault(TIDB_DEDUPLICATE, "true").toBoolean // region split val enableRegionSplit: Boolean = getOrDefault(TIDB_ENABLE_REGION_SPLIT, "true").toBoolean @@ -206,6 +207,7 @@ object TiDBOptions { val TIDB_PREWRITE_MAX_RETRY_TIMES: String = newOption("prewriteMaxRetryTimes") val TIDB_COMMIT_PRIMARY_KEY_RETRY_NUMBER: String = newOption("commitPrimaryKeyRetryNumber") val TIDB_ENABLE_UPDATE_TABLE_STATISTICS: String = newOption("enableUpdateTableStatistics") + val TIDB_DEDUPLICATE: String = newOption("deduplicate") // region split val TIDB_ENABLE_REGION_SPLIT: String = newOption("enableRegionSplit") diff --git a/core/src/test/scala/com/pingcap/tispark/BatchWriteIssueSuite.scala b/core/src/test/scala/com/pingcap/tispark/BatchWriteIssueSuite.scala index 139c08b2b5..ce89f36981 100644 --- a/core/src/test/scala/com/pingcap/tispark/BatchWriteIssueSuite.scala +++ b/core/src/test/scala/com/pingcap/tispark/BatchWriteIssueSuite.scala @@ -15,6 +15,7 @@ package com.pingcap.tispark +import com.pingcap.tikv.exception.TiBatchWriteException import com.pingcap.tispark.datasource.BaseBatchWriteTest import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row @@ -29,6 +30,35 @@ import org.apache.spark.sql.types.{ class BatchWriteIssueSuite extends BaseBatchWriteTest("test_batchwrite_issue") { + test("deduplicate=false") { + jdbcUpdate(s"drop table if exists $table") + jdbcUpdate(s"""create table $table( + |`id` varchar(36) COLLATE utf8_general_ci NOT NULL, + |`name` varchar(36) COLLATE utf8_general_ci DEFAULT NULL, + |`school` varchar(36) COLLATE utf8_general_ci NOT NULL, + |PRIMARY KEY (`id`), + |UNIQUE KEY `test_unique_1` (`name`,`school`) + |)""".stripMargin) + + val s = spark + + import s.implicits._ + + val df = Seq(("10", "n5", "n10"), ("11", "n5", "n10")).toDF("id", "name", "school") + + val caught = intercept[TiBatchWriteException] { + df.write + .format("tidb") + .options(tidbOptions) + .option("database", database) + .option("table", table) + .option("deduplicate", "false") + .mode("append") + .save() + } + assert(caught.getMessage.equals("duplicate unique key or primary key")) + } + ignore("stats_meta update modify_count") { if (!supportBatchWrite) { cancel() diff --git a/docs/datasource_api_userguide.md b/docs/datasource_api_userguide.md index 9ccda1c959..5ceb2151e6 100644 --- a/docs/datasource_api_userguide.md +++ b/docs/datasource_api_userguide.md @@ -184,6 +184,7 @@ The following table shows the TiDB-specific options, which can be passed in thro | writeThreadPerTask | false | 1 | Thread number each spark task use to write data to TiKV | | bytesPerRegion | false | 100663296 (96M) | Decrease this parameter to split more regions (increase write concurrency) | | enableUpdateTableStatistics | false | false | Update statistics in table `mysql.stats_meta` (`tidb.user` must own update privilege to table `mysql.stats_meta` if set true) | +| deduplicate | false | true | Deduplicate data in DataFrame according to primary key and unique key | ## TiDB Version