Skip to content

Commit

Permalink
add arguemnt: deduplicate (#2073)
Browse files Browse the repository at this point in the history
Co-authored-by: marsishandsome <[email protected]>
  • Loading branch information
ti-srebot and marsishandsome authored May 12, 2021
1 parent 23592a2 commit 5ba48bc
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ class TiBatchWriteTable(

val distinctWrappedRowRdd = deduplicate(wrappedRowRdd)

if (!options.deduplicate) {
val c1 = wrappedRowRdd.count()
val c2 = distinctWrappedRowRdd.count()
if (c1 != c2) {
throw new TiBatchWriteException("duplicate unique key or primary key")
}
}

val deletion = (if (options.useSnapshotBatchGet) {
generateDataToBeRemovedRddV2(distinctWrappedRowRdd, startTimeStamp)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class TiDBOptions(@transient val parameters: CaseInsensitiveMap[String]) extends
getOrDefault(TIDB_COMMIT_PRIMARY_KEY_RETRY_NUMBER, "4").toInt
val enableUpdateTableStatistics: Boolean =
getOrDefault(TIDB_ENABLE_UPDATE_TABLE_STATISTICS, "false").toBoolean
val deduplicate: Boolean = getOrDefault(TIDB_DEDUPLICATE, "true").toBoolean

// region split
val enableRegionSplit: Boolean = getOrDefault(TIDB_ENABLE_REGION_SPLIT, "true").toBoolean
Expand Down Expand Up @@ -206,6 +207,7 @@ object TiDBOptions {
val TIDB_PREWRITE_MAX_RETRY_TIMES: String = newOption("prewriteMaxRetryTimes")
val TIDB_COMMIT_PRIMARY_KEY_RETRY_NUMBER: String = newOption("commitPrimaryKeyRetryNumber")
val TIDB_ENABLE_UPDATE_TABLE_STATISTICS: String = newOption("enableUpdateTableStatistics")
val TIDB_DEDUPLICATE: String = newOption("deduplicate")

// region split
val TIDB_ENABLE_REGION_SPLIT: String = newOption("enableRegionSplit")
Expand Down
30 changes: 30 additions & 0 deletions core/src/test/scala/com/pingcap/tispark/BatchWriteIssueSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.pingcap.tispark

import com.pingcap.tikv.exception.TiBatchWriteException
import com.pingcap.tispark.datasource.BaseBatchWriteTest
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
Expand All @@ -29,6 +30,35 @@ import org.apache.spark.sql.types.{

class BatchWriteIssueSuite extends BaseBatchWriteTest("test_batchwrite_issue") {

test("deduplicate=false") {
jdbcUpdate(s"drop table if exists $table")
jdbcUpdate(s"""create table $table(
|`id` varchar(36) COLLATE utf8_general_ci NOT NULL,
|`name` varchar(36) COLLATE utf8_general_ci DEFAULT NULL,
|`school` varchar(36) COLLATE utf8_general_ci NOT NULL,
|PRIMARY KEY (`id`),
|UNIQUE KEY `test_unique_1` (`name`,`school`)
|)""".stripMargin)

val s = spark

import s.implicits._

val df = Seq(("10", "n5", "n10"), ("11", "n5", "n10")).toDF("id", "name", "school")

val caught = intercept[TiBatchWriteException] {
df.write
.format("tidb")
.options(tidbOptions)
.option("database", database)
.option("table", table)
.option("deduplicate", "false")
.mode("append")
.save()
}
assert(caught.getMessage.equals("duplicate unique key or primary key"))
}

ignore("stats_meta update modify_count") {
if (!supportBatchWrite) {
cancel()
Expand Down
1 change: 1 addition & 0 deletions docs/datasource_api_userguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ The following table shows the TiDB-specific options, which can be passed in thro
| writeThreadPerTask | false | 1 | Thread number each spark task use to write data to TiKV |
| bytesPerRegion | false | 100663296 (96M) | Decrease this parameter to split more regions (increase write concurrency) |
| enableUpdateTableStatistics | false | false | Update statistics in table `mysql.stats_meta` (`tidb.user` must own update privilege to table `mysql.stats_meta` if set true) |
| deduplicate | false | true | Deduplicate data in DataFrame according to primary key and unique key |
## TiDB Version
Expand Down

0 comments on commit 5ba48bc

Please sign in to comment.