Skip to content

Commit

Permalink
IGNITE-20804 Support custom conflict resolver at CDC based datacenter…
Browse files Browse the repository at this point in the history
… replication (#11032)
  • Loading branch information
anton-vinogradov authored Nov 9, 2023
1 parent bc73d09 commit 0f2fded
Showing 1 changed file with 44 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Change Data Capture Extension
= Cross-cluster Replication Extension

WARNING: CDC is an experimental feature. API or design architecture might be changed.
WARNING: Change Data Capture (CDC) and Cross-cluster Replication Extension are experimental features. API or design architecture might be changed.

== Overview
link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[Change Data Capture Extension] module provides two ways to set up cross cluster replication based on CDC.
link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[Cross-cluster Replication Extension] module provides the following ways to set up cross-cluster replication based on CDC.

. link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java[Ignite2IgniteClientCdcStreamer] - streams changes to destination cluster using link:thin-clients/java-thin-client[Java Thin Client].
. link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java[Ignite2IgniteCdcStreamer] - streams changes to destination cluster using client node.
. link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java[Ignite2KafkaCdcStreamer] combined with link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java[KafkaToIgniteCdcStreamer] streams changes to destination cluster using link:https://kafka.apache.org[Apache Kafka] as a transport.

NOTE: For each cache replicated between clusters link:https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java[CacheVersionConflictResolver] should be defined.
NOTE: Conflict resolver should be defined for each cache replicated between the clusters.

NOTE: All implementations of CDC replication support replication of link:https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/binary/BinaryType.html[BinaryTypes] and link:https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cdc/TypeMapping.html[TypeMappings]
NOTE: All implementations of the cross-cluster replication support replication of link:https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/binary/BinaryType.html[BinaryTypes] and link:https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cdc/TypeMapping.html[TypeMappings]

NOTE: To use SQL queries on the destination cluster over CDC-replicated data, set the same `VALUE_TYPE` in
link:sql-reference/ddl#create-table[CREATE TABLE] on both source and destination clusters for each table.
Expand Down Expand Up @@ -182,19 +182,17 @@ Kafka to Ignite configuration file should contain the following beans that will

`kafka-to-ignite.sh` uses the same logging configuration as the Ignite node does. The only difference is that the log is written in the "kafka-ignite-streamer.log" file.

== CacheVersionConflictResolver implementation

== Fault tolerance
It expected that CDC streamers will be configured with the `onlyPrimary=false` in most real-world deployments to ensure fault-tolerance.
That means streamer will send the same change several times equal to `CacheConfiguration#backups` + 1.
At the same time concurrent updates of the same key can be done in replicated clusters.
`CacheVersionConflictResolver` used by Ignite node to selects or merge new (from update request) and existing (stored in the cluster) entry versions.
Selected entry version will be actually stored in the cluster.

NOTE: Default implementation only select correct entry and never merge.
== Conflict resolution
Conflict resolver should be defined for each cache replicated between the clusters.
Cross-cluster replication extension has the link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java[default] conflict resolver implementation.

link:https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java[CacheVersionConflictResolver] should be defined for each cache replicated between clusters.
NOTE: Default implementation only select correct entry and never merge.

Default link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java[implementation] is available in cdc-ext.
The default resolver implementation will be used when custom conflict resolver is not set.

=== Configuration

Expand All @@ -204,20 +202,44 @@ Default link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc
| `clusterId` | Local cluster id. Can be any value from 1 to 31. | null
| `caches` | Set of cache names to handle with this plugin instance. | null
| `conflictResolveField` | Value field to resolve conflict with. Optional. Field values must implement `java.lang.Comparable`. | null
| `conflictResolver` | Custom conflict resolver. Optional. Field must implement `CacheVersionConflictResolver`. | null
|===

=== Conflict resolve algorithm

Replicated changes contain some additional data. Specifically, entry version from source cluster supplied with the changed data.
=== Conflict resolution algorithm
Replicated changes contain some additional data. Specifically, entry's version from source cluster is supplied with the changed data.
Default conflict resolve algorithm based on entry version and `conflictResolveField`.
Conflict resolution field should contain user provided monotonically increasing value such as query id or timestamp.

. Changes from the "local" cluster always win.
. If both old and new entry from the same cluster version comparison used to determine order.
. If `conflictResolveField` if provided then field values comparison used to determine order.
. Conflict resolution failed. Update will be ignored.
==== Conflict resolution based on the entry's version
This approach provides the eventual consistency guarantee when each entry is updatable only from a single cluster.

IMPORTANT: This approach does not replicate any updates or removals from the destination cluster to the source cluster.

.Algorithm:
.. Changes from the "local" cluster are always win. Any replicated data can be overridden locally.
.. If both old and new entry are from the same cluster then entry versions comparison is used to determine the order.
.. Conflict resolution failed. Update will be ignored. Failure will be logged.

==== Conflict resolution based on the entry's value field
This approach provides the eventual consistency guarantee even when entry is updatable from any cluster.

NOTE: Conflict resolution field, specified by `conflictResolveField`, should contain a user provided monotonically increasing value such as query id or timestamp.

IMPORTANT: This approach does not replicate the removals from the destination cluster to the source cluster, because removes can't be versioned by the field.

.Algorithm:
.. Changes from the "local" cluster are always win. Any replicated data can be overridden locally.
.. If both old and new entry are from the same cluster then entry versions comparison is used to determine the order.
.. If `conflictResolveField` is provided then field values comparison is used to determine the order.
.. Conflict resolution failed. Update will be ignored. Failure will be logged.

==== Custom conflict resolution rules
You're able to define your own rules for resolving conflicts based on the nature of your data and operations.
This can be particularly useful in more complex situations where the standard conflict resolution strategies do not apply.

Choosing the right conflict resolution strategy depends on your specific use case and requires a good understanding of your data and its usage.
You should consider the nature of your transactions, the rate of change of your data, and the implications of potential data loss or overwrites when selecting a conflict resolution strategy.

IMPORTANT: The current implementation does not support deleting data from the cache in the destination cluster. The data must be deleted in the source cluster.
Custom conflict resolver can be set via `conflictResolver` and allows to compare or merge the conflict data in any required way.

=== Configuration example
Configuration is done via Ignite node plugin:
Expand Down

0 comments on commit 0f2fded

Please sign in to comment.