From 0f2fded074684458bea4a4451512dd341bf0b802 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Thu, 9 Nov 2023 12:13:45 +0300 Subject: [PATCH] IGNITE-20804 Support custom conflict resolver at CDC based datacenter replication (#11032) --- .../change-data-capture-extensions.adoc | 66 ++++++++++++------- 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc index e8c9e712d6217..399643e96adb2 100644 --- a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc +++ b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc @@ -12,20 +12,20 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -= Change Data Capture Extension += Cross-cluster Replication Extension -WARNING: CDC is an experimental feature. API or design architecture might be changed. +WARNING: Change Data Capture (CDC) and Cross-cluster Replication Extension are experimental features. API or design architecture might be changed. == Overview -link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[Change Data Capture Extension] module provides two ways to set up cross cluster replication based on CDC. +link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[Cross-cluster Replication Extension] module provides the following ways to set up cross-cluster replication based on CDC. . link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java[Ignite2IgniteClientCdcStreamer] - streams changes to destination cluster using link:thin-clients/java-thin-client[Java Thin Client]. . link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java[Ignite2IgniteCdcStreamer] - streams changes to destination cluster using client node. . link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java[Ignite2KafkaCdcStreamer] combined with link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java[KafkaToIgniteCdcStreamer] streams changes to destination cluster using link:https://kafka.apache.org[Apache Kafka] as a transport. -NOTE: For each cache replicated between clusters link:https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java[CacheVersionConflictResolver] should be defined. +NOTE: Conflict resolver should be defined for each cache replicated between the clusters. -NOTE: All implementations of CDC replication support replication of link:https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/binary/BinaryType.html[BinaryTypes] and link:https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cdc/TypeMapping.html[TypeMappings] +NOTE: All implementations of the cross-cluster replication support replication of link:https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/binary/BinaryType.html[BinaryTypes] and link:https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cdc/TypeMapping.html[TypeMappings] NOTE: To use SQL queries on the destination cluster over CDC-replicated data, set the same `VALUE_TYPE` in link:sql-reference/ddl#create-table[CREATE TABLE] on both source and destination clusters for each table. @@ -182,19 +182,17 @@ Kafka to Ignite configuration file should contain the following beans that will `kafka-to-ignite.sh` uses the same logging configuration as the Ignite node does. The only difference is that the log is written in the "kafka-ignite-streamer.log" file. -== CacheVersionConflictResolver implementation - +== Fault tolerance It expected that CDC streamers will be configured with the `onlyPrimary=false` in most real-world deployments to ensure fault-tolerance. That means streamer will send the same change several times equal to `CacheConfiguration#backups` + 1. -At the same time concurrent updates of the same key can be done in replicated clusters. -`CacheVersionConflictResolver` used by Ignite node to selects or merge new (from update request) and existing (stored in the cluster) entry versions. -Selected entry version will be actually stored in the cluster. -NOTE: Default implementation only select correct entry and never merge. +== Conflict resolution +Conflict resolver should be defined for each cache replicated between the clusters. +Cross-cluster replication extension has the link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java[default] conflict resolver implementation. -link:https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java[CacheVersionConflictResolver] should be defined for each cache replicated between clusters. +NOTE: Default implementation only select correct entry and never merge. -Default link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java[implementation] is available in cdc-ext. +The default resolver implementation will be used when custom conflict resolver is not set. === Configuration @@ -204,20 +202,44 @@ Default link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc | `clusterId` | Local cluster id. Can be any value from 1 to 31. | null | `caches` | Set of cache names to handle with this plugin instance. | null | `conflictResolveField` | Value field to resolve conflict with. Optional. Field values must implement `java.lang.Comparable`. | null +| `conflictResolver` | Custom conflict resolver. Optional. Field must implement `CacheVersionConflictResolver`. | null |=== -=== Conflict resolve algorithm - -Replicated changes contain some additional data. Specifically, entry version from source cluster supplied with the changed data. +=== Conflict resolution algorithm +Replicated changes contain some additional data. Specifically, entry's version from source cluster is supplied with the changed data. Default conflict resolve algorithm based on entry version and `conflictResolveField`. -Conflict resolution field should contain user provided monotonically increasing value such as query id or timestamp. -. Changes from the "local" cluster always win. -. If both old and new entry from the same cluster version comparison used to determine order. -. If `conflictResolveField` if provided then field values comparison used to determine order. -. Conflict resolution failed. Update will be ignored. +==== Conflict resolution based on the entry's version +This approach provides the eventual consistency guarantee when each entry is updatable only from a single cluster. + +IMPORTANT: This approach does not replicate any updates or removals from the destination cluster to the source cluster. + +.Algorithm: +.. Changes from the "local" cluster are always win. Any replicated data can be overridden locally. +.. If both old and new entry are from the same cluster then entry versions comparison is used to determine the order. +.. Conflict resolution failed. Update will be ignored. Failure will be logged. + +==== Conflict resolution based on the entry's value field +This approach provides the eventual consistency guarantee even when entry is updatable from any cluster. + +NOTE: Conflict resolution field, specified by `conflictResolveField`, should contain a user provided monotonically increasing value such as query id or timestamp. + +IMPORTANT: This approach does not replicate the removals from the destination cluster to the source cluster, because removes can't be versioned by the field. + +.Algorithm: +.. Changes from the "local" cluster are always win. Any replicated data can be overridden locally. +.. If both old and new entry are from the same cluster then entry versions comparison is used to determine the order. +.. If `conflictResolveField` is provided then field values comparison is used to determine the order. +.. Conflict resolution failed. Update will be ignored. Failure will be logged. + +==== Custom conflict resolution rules +You're able to define your own rules for resolving conflicts based on the nature of your data and operations. +This can be particularly useful in more complex situations where the standard conflict resolution strategies do not apply. + +Choosing the right conflict resolution strategy depends on your specific use case and requires a good understanding of your data and its usage. +You should consider the nature of your transactions, the rate of change of your data, and the implications of potential data loss or overwrites when selecting a conflict resolution strategy. -IMPORTANT: The current implementation does not support deleting data from the cache in the destination cluster. The data must be deleted in the source cluster. +Custom conflict resolver can be set via `conflictResolver` and allows to compare or merge the conflict data in any required way. === Configuration example Configuration is done via Ignite node plugin: