Skip to content

Commit

Permalink
fix: Transactional commit mode system improvements and docs (#355)
Browse files Browse the repository at this point in the history
* Transactional commit mode system improvements and docs (#355)
** Clarifies transaction system with much better documentation.
** Fixes a potential race condition which could cause offset leaks between transactions boundaries.
** Introduces lock acquisition timeouts.
** Fixes a potential issue with removing records from the retry queue incorrectly, by having an inconsistency between compareTo and equals in the retry TreeMap.

Details:

* Fix potential race condition with tx committing and work being available by expanding locking scope

Don't finish producing with lock until work as been put into work inbox so that controller can process it before arranging commits.

Aquire the commit lock earlier, so that no more records are able to be sent between process results and committing transaction. Otherwise a record from a future commit coule be added to this transaction, without its source offsets being collected for commit due to the work's status not being completed (as it hasn't been processed by the controller yet).

* Too many messages were being produced at a certain times, making the test unreliable. Changed system to add only minimum amount needed, by calculating via bytes required to add to payload.

* eliminate race condition by waiting for commit lock to be fully released before trying to acquire

* refactor: Refactoring split ProducerManager out to ProducerWrapper (#399)

* Remove sync lock on producer wrapper for committing transactions - shouldn't be needed anymore with the more robust produce lock - as that blocks producer access while committing instead of relying on sync locks

* Create single point for integration commit assertions

* Fixed an issue where a WC may not get removed from the retry queue due to inconsistency between equals and compareTo in TreeMap

* Turn off forced transaction commit mode for vertx test - can't see why it had been done
  • Loading branch information
astubbs authored Sep 29, 2022
1 parent 7630c96 commit 9423763
Show file tree
Hide file tree
Showing 60 changed files with 2,466 additions and 439 deletions.
14 changes: 11 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.DS_Store

# Compiled class file
*.class

Expand All @@ -22,9 +24,6 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

# IDEA run configurations
target/
.DS_Store
*.versionsBackup

# JENV
Expand All @@ -39,6 +38,9 @@ delombok/
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
.idea/sonarlint/
.idea/libraries/


# Generated files
.idea/**/contentModel.xml
Expand All @@ -65,3 +67,9 @@ delombok/
# Maven
target
release.properties
/.idea/encodings.xml
/.idea/misc.xml
/.idea/codeStyles/Project.xml
/.idea/inspectionProfiles/Project_Default.xml
/.idea/uiDesigner.xml
/.idea/vcs.xml
5 changes: 5 additions & 0 deletions .idea/codeStyles/codeStyleConfig.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions .idea/runConfigurations/_Tag__transactions__.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ endif::[]

== Next Version

=== Improvements

* Transactional commit mode system improvements and docs (#355)
** Clarifies transaction system with much better documentation.
** Fixes a potential race condition which could cause offset leaks between transactions boundaries.
** Introduces lock acquisition timeouts.
** Fixes a potential issue with removing records from the retry queue incorrectly, by having an inconsistency between compareTo and equals in the retry TreeMap.

== v0.5.2.3

=== Improvements
Expand Down
100 changes: 100 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ without operational burden or harming the cluster's performance
** Non-blocking I/O work management
** Vert.x's WebClient and general Vert.x Future support
** Reactor.io Publisher (Mono/Flux) and Java's CompletableFuture (through `Mono#fromFuture`)
* Exactly Once bulk transaction system
** When using the transactional mode, record processing that happens in parallel and produce records back to kafka get all grouped into a large batch transaction, and the offsets and records are submitted through the transactional producer, giving you Exactly once Semantics for parallel processing.
** For further information, see the <<transaction-system>> section.
* Fair partition traversal
* Zero~ dependencies (`Slf4j` and `Lombok`) for the core module
* Java 8 compatibility
Expand Down Expand Up @@ -877,6 +880,7 @@ The `transactional` mode is explained in the next section.
If you're used to using the auto commit mode in the normal Kafka consumer, you can think of the `Asynchronous` mode being similar to this.
We suggest starting with this mode, and it is the default.

[[transaction-system]]
=== Apache Kafka EoS Transaction Model

There is also the option to use Kafka's Exactly Once Semantics (EoS) system.
Expand All @@ -890,6 +894,94 @@ CAUTION: This cannot be true for any externally integrated third party system, u

For implementations details, see the <<Transactional System Architecture>> section.

.From the Options Javadoc
[source,java,indent=0]
----
/**
* Periodically commits through the Producer using transactions.
* <p>
* Messages sent in parallel by different workers get added to the same transaction block - you end up with
* transactions 100ms (by default) "large", containing all records sent during that time period, from the
* offsets being committed.
* <p>
* Of no use, if not also producing messages (i.e. using a {@link ParallelStreamProcessor#pollAndProduce}
* variation).
* <p>
* Note: Records being sent by different threads will all be in a single transaction, as PC shares a single
* Producer instance. This could be seen as a performance overhead advantage, efficient resource use, in
* exchange for a loss in transaction granularity.
* <p>
* The benefits of using this mode are:
* <p>
* a) All records produced from a given source offset will either all be visible, or none will be
* ({@link org.apache.kafka.common.IsolationLevel#READ_COMMITTED}).
* <p>
* b) If any records making up a transaction have a terminal issue being produced, or the system crashes before
* finishing sending all the records and committing, none will ever be visible and the system will eventually
* retry them in new transactions - potentially with different combinations of records from the original.
* <p>
* c) A source offset, and it's produced records will be committed as an atomic set. Normally: either the record
* producing could fail, or the committing of the source offset could fail, as they are separate individual
* operations. When using Transactions, they are committed together - so if either operations fails, the
* transaction will never get committed, and upon recovery, the system will retry the set again (and no
* duplicates will be visible in the topic).
* <p>
* This {@code CommitMode} is the slowest of the options, but there will be no duplicates in Kafka caused by
* producing a record multiple times if previous offset commits have failed or crashes have occurred (however
* message replay may cause duplicates in external systems which is unavoidable - external systems must be
* idempotent).
* <p>
* The default commit interval {@link AbstractParallelEoSStreamProcessor#KAFKA_DEFAULT_AUTO_COMMIT_FREQUENCY}
* gets automatically reduced from the default of 5 seconds to 100ms (the same as Kafka Streams <a
* href=https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html">commit.interval.ms</a>).
* Reducing this configuration places higher load on the broker, but will reduce (but cannot eliminate) replay
* upon failure. Note also that when using transactions in Kafka, consumption in {@code READ_COMMITTED} mode is
* blocked up to the offset of the first STILL open transaction. Using a smaller commit frequency reduces this
* minimum consumption latency - the faster transactions are closed, the faster the transaction content can be
* read by {@code READ_COMMITTED} consumers. More information about this can be found on the Confluent blog
* post:
* <a href="https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/">Enabling Exactly-Once in Kafka
* Streams</a>.
* <p>
* When producing multiple records (see {@link ParallelStreamProcessor#pollAndProduceMany}), all records must
* have been produced successfully to the broker before the transaction will commit, after which all will be
* visible together, or none.
* <p>
* Records produced while running in this mode, won't be seen by consumer running in
* {@link ConsumerConfig#ISOLATION_LEVEL_CONFIG} {@link org.apache.kafka.common.IsolationLevel#READ_COMMITTED}
* mode until the transaction is complete and all records are produced successfully. Records produced into a
* transaction that gets aborted or timed out, will never be visible.
* <p>
* The system must prevent records from being produced to the brokers whose source consumer record offsets has
* not been included in this transaction. Otherwise, the transactions would include produced records from
* consumer offsets which would only be committed in the NEXT transaction, which would break the EoS guarantees.
* To achieve this, first work processing and record producing is suspended (by acquiring the commit lock -
* see{@link #commitLockAcquisitionTimeout}, as record processing requires the produce lock), then succeeded
* consumer offsets are gathered, transaction commit is made, then when the transaction has finished, processing
* resumes by releasing the commit lock. This periodically slows down record production during this phase, by
* the time needed to commit the transaction.
* <p>
* This is all separate from using an IDEMPOTENT Producer, which can be used, along with the
* {@link ParallelConsumerOptions#commitMode} {@link CommitMode#PERIODIC_CONSUMER_SYNC} or
* {@link CommitMode#PERIODIC_CONSUMER_ASYNCHRONOUS}.
* <p>
* Failure:
* <p>
* Commit lock: If the system cannot acquire the commit lock in time, it will shut down for whatever reason, the
* system will shut down (fail fast) - during the shutdown a final commit attempt will be made. The default
* timeout for acquisition is very high though - see {@link #commitLockAcquisitionTimeout}. This can be caused
* by the user processing function taking too long to complete.
* <p>
* Produce lock: If the system cannot acquire the produce lock in time, it will fail the record processing and
* retry the record later. This can be caused by the controller taking too long to commit for some reason. See
* {@link #produceLockAcquisitionTimeout}. If using {@link #allowEagerProcessingDuringTransactionCommit}, this
* may cause side effect replay when the record is retried, otherwise there is no replay. See
* {@link #allowEagerProcessingDuringTransactionCommit} for more details.
*
* @see ParallelConsumerOptions.ParallelConsumerOptionsBuilder#commitInterval
*/
----

[[streams-usage]]
== Using with Kafka Streams

Expand Down Expand Up @@ -1180,6 +1272,14 @@ endif::[]

== Next Version

=== Improvements

* Transactional commit mode system improvements and docs (#355)
** Clarifies transaction system with much better documentation.
** Fixes a potential race condition which could cause offset leaks between transactions boundaries.
** Introduces lock acquisition timeouts.
** Fixes a potential issue with removing records from the retry queue incorrectly, by having an inconsistency between compareTo and equals in the retry TreeMap.

== v0.5.2.3

=== Improvements
Expand Down
1 change: 1 addition & 0 deletions parallel-consumer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
<classes>
<class>io.confluent.parallelconsumer.PollContext</class>
<class>io.confluent.parallelconsumer.ParallelEoSStreamProcessor</class>
<class>io.confluent.parallelconsumer.internal.ProducerManager</class>
<class>io.confluent.parallelconsumer.state.WorkContainer</class>
<class>io.confluent.parallelconsumer.state.WorkManager</class>
<class>io.confluent.parallelconsumer.state.PartitionState</class>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.Builder;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -22,11 +24,31 @@ public Clock getClock() {

@SneakyThrows
public static <RESULT> RESULT time(final Callable<RESULT> func) {
return timeWithMeta(func).getResult();
}

@SneakyThrows
public static <RESULT> TimeResult<RESULT> timeWithMeta(final Callable<? extends RESULT> func) {
long start = System.currentTimeMillis();
TimeResult.TimeResultBuilder<RESULT> timer = TimeResult.<RESULT>builder().startMs(start);
RESULT call = func.call();
long elapsed = System.currentTimeMillis() - start;
timer.result(call);
long end = System.currentTimeMillis();
long elapsed = end - start;
timer.endMs(end);
log.trace("Function took {}", Duration.ofMillis(elapsed));
return call;
return timer.build();
}

@Builder
@Value
public static class TimeResult<RESULT> {
long startMs;
long endMs;
RESULT result;

public Duration getElapsed() {
return Duration.ofMillis(endMs - startMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> parallelC

@Override
public Stream<ConsumeProduceResult<K, V, K, V>> pollProduceAndStream(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction) {
super.pollAndProduceMany(userFunction, (result) -> {
super.pollAndProduceMany(userFunction, result -> {
log.trace("Wrapper callback applied, sending result to stream. Input: {}", result);
this.userProcessResultsStream.add(result);
});
Expand Down
Loading

0 comments on commit 9423763

Please sign in to comment.