diff --git a/build.gradle.kts b/build.gradle.kts index da563074..c8868b9b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,7 +1,13 @@ import com.bnorm.power.PowerAssertGradleExtension import kotlinx.knit.KnitPluginExtension import org.gradle.api.tasks.testing.logging.TestExceptionFormat +import org.gradle.api.tasks.testing.logging.TestExceptionFormat.* import org.gradle.api.tasks.testing.logging.TestLogEvent +import org.gradle.api.tasks.testing.logging.TestLogEvent.FAILED +import org.gradle.api.tasks.testing.logging.TestLogEvent.PASSED +import org.gradle.api.tasks.testing.logging.TestLogEvent.SKIPPED +import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_ERROR +import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_OUT import org.jetbrains.dokka.gradle.DokkaTask plugins { @@ -46,6 +52,10 @@ configure { } } +kotlin { + explicitApi() +} + tasks { withType().configureEach { outputDirectory.set(rootDir.resolve("docs")) @@ -71,14 +81,8 @@ tasks { withType().configureEach { useJUnitPlatform() testLogging { - exceptionFormat = TestExceptionFormat.FULL - events = setOf( - TestLogEvent.PASSED, - TestLogEvent.SKIPPED, - TestLogEvent.FAILED, - TestLogEvent.STANDARD_OUT, - TestLogEvent.STANDARD_ERROR - ) + exceptionFormat = FULL + events = setOf(SKIPPED, FAILED, STANDARD_ERROR) } } } diff --git a/guide/example/example-publisher-01.kt b/guide/example/example-publisher-01.kt index 1e8a19b3..b221d9af 100644 --- a/guide/example/example-publisher-01.kt +++ b/guide/example/example-publisher-01.kt @@ -22,6 +22,7 @@ fun main() = SuspendApp { KafkaPublisher(settings).use { publisher -> // ... use the publisher val m: Map = publisher.metrics() + println(m) publisher.publishScope { // send record without awaiting acknowledgement diff --git a/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt b/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt index 9e500bbd..32ed6e9f 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt @@ -191,7 +191,7 @@ public fun List>.offsets( "io.github.nomisRev.kafka.receiver.KafkaReceiver" ) ) -@OptIn(FlowPreview::class) +@OptIn(ExperimentalCoroutinesApi::class) public fun Flow>.subscribeTo( name: String, dispatcher: CoroutineDispatcher = IO, diff --git a/src/main/kotlin/io/github/nomisRev/kafka/Producer.kt b/src/main/kotlin/io/github/nomisRev/kafka/Producer.kt index 73185b2c..6463aa52 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/Producer.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/Producer.kt @@ -91,7 +91,7 @@ public fun kafkaProducer( "Use io.github.nomisRev.kafka.publisher.Acks instead", ReplaceWith("this", "io.github.nomisRev.kafka.publisher.Acks") ) -typealias Acks = +public typealias Acks = io.github.nomisRev.kafka.publisher.Acks @Deprecated(""" @@ -122,7 +122,7 @@ public data class ProducerSettings( other?.let { putAll(other) } } - fun toPublisherSettings(): PublisherSettings = + public fun toPublisherSettings(): PublisherSettings = PublisherSettings( bootstrapServers, keyDeserializer, diff --git a/src/main/kotlin/io/github/nomisRev/kafka/publisher/FlowProduce.kt b/src/main/kotlin/io/github/nomisRev/kafka/publisher/FlowProduce.kt index a130ffd8..4c606cca 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/publisher/FlowProduce.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/publisher/FlowProduce.kt @@ -68,7 +68,7 @@ import kotlin.time.toJavaDuration * This only occurs when a fatal error occurred, and Flow transitions to COMPLETE. * @param createProducer a callback to create a producer, by default it uses the one from [PublisherSettings]. */ -fun Flow>.produce( +public fun Flow>.produce( settings: PublisherSettings, onPublisherRecordDropped: (suspend (Logger, ProducerRecord) -> Unit)? = null, createProducer: (suspend (PublisherSettings) -> Producer)? = null @@ -88,7 +88,7 @@ fun Flow>.produce( * This means we don't have to wait for the acknowledgement before sending the next message, * resulting in maximum throughput but still guarantees that the message was sent to Kafka. */ -fun Flow>.produceOrThrow( +public fun Flow>.produceOrThrow( settings: PublisherSettings, onPublisherRecordDropped: (suspend (Logger, ProducerRecord) -> Unit)? = null, createProducer: (suspend (PublisherSettings) -> Producer)? = null diff --git a/src/main/kotlin/io/github/nomisRev/kafka/publisher/KafkaPublisher.kt b/src/main/kotlin/io/github/nomisRev/kafka/publisher/KafkaPublisher.kt index 1d0b182d..2fd581be 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/publisher/KafkaPublisher.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/publisher/KafkaPublisher.kt @@ -64,6 +64,7 @@ import kotlin.time.toJavaDuration * KafkaPublisher(settings).use { publisher -> * // ... use the publisher * val m: Map = publisher.metrics() + * println(m) * * publisher.publishScope { * // send record without awaiting acknowledgement @@ -77,7 +78,7 @@ import kotlin.time.toJavaDuration * ``` * */ -fun KafkaPublisher( +public fun KafkaPublisher( settings: PublisherSettings, createProducer: (suspend (PublisherSettings) -> Producer)? = null ): KafkaPublisher = @@ -88,7 +89,7 @@ fun KafkaPublisher( * It has 1 main method, [publishScope] which creates a [PublishScope], * and two suspending methods from the [Producer] [partitionsFor], and [metrics]. */ -interface KafkaPublisher : AutoCloseable { +public interface KafkaPublisher : AutoCloseable { /** * Create and run a [publishScope], which can [PublishScope.offer] and [PublishScope.publish] records to Kafka. @@ -125,13 +126,13 @@ interface KafkaPublisher : AutoCloseable { * } * ``` */ - suspend fun publishScope(block: suspend TransactionalScope.() -> A): A + public suspend fun publishScope(block: suspend TransactionalScope.() -> A): A /** @see KafkaProducer.partitionsFor */ - suspend fun partitionsFor(topic: String): List + public suspend fun partitionsFor(topic: String): List /** @see KafkaProducer.metrics */ - suspend fun metrics(): Map + public suspend fun metrics(): Map } private class DefaultKafkaPublisher( diff --git a/src/main/kotlin/io/github/nomisRev/kafka/publisher/PublisherScope.kt b/src/main/kotlin/io/github/nomisRev/kafka/publisher/PublisherScope.kt index a4f76276..d15ef99b 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/publisher/PublisherScope.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/publisher/PublisherScope.kt @@ -14,7 +14,7 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.errors.ProducerFencedException @DslMarker -annotation class PublisherDSL +public annotation class PublisherDSL /** * The DSL, or receiver type, of [KafkaPublisher.publishScope] and [TransactionalScope.transaction]. @@ -24,7 +24,7 @@ annotation class PublisherDSL * - [publish], this gives you less throughput, but waits on the delivery of the messages. */ @PublisherDSL -interface PublishScope : CoroutineScope { +public interface PublishScope : CoroutineScope { /** * Offer the [record] to Kafka, and immediately return. @@ -33,7 +33,7 @@ interface PublishScope : CoroutineScope { * * @param record to be offered to kafka */ - suspend fun offer(record: ProducerRecord) + public suspend fun offer(record: ProducerRecord) /** * Publisher a [record] to Kafka, and suspends until acknowledged by kafka. @@ -44,31 +44,32 @@ interface PublishScope : CoroutineScope { * * @param record to be delivered to kafka */ - suspend fun publish(record: ProducerRecord): RecordMetadata + public suspend fun publish(record: ProducerRecord): RecordMetadata /** * Same as [offer], but for an [Iterable]] of [ProducerRecord]. * @see offer */ - suspend fun offer(records: Iterable>) = + public suspend fun offer(records: Iterable>) { records.map { offer(it) } + } /** * Same as [publish], but for an [Iterable]] of [ProducerRecord]. * @see publish */ - suspend fun publish(record: Iterable>): List = coroutineScope { + public suspend fun publish(record: Iterable>): List = coroutineScope { record.map { async { publish(it) } }.awaitAll() } /** Alias for `runCatching`, and `publish` except rethrows fatal exceptions */ - suspend fun publishCatching(record: ProducerRecord): Result + public suspend fun publishCatching(record: ProducerRecord): Result /** * Catch first failure of [publish], except fatal exceptions. * Alias for `runCatching`, `publish`, and `awaitAll`, except rethrows fatal exceptions */ - suspend fun publishCatching(record: Iterable>): Result> + public suspend fun publishCatching(record: Iterable>): Result> } /** @@ -89,7 +90,7 @@ interface PublishScope : CoroutineScope { * then it'll be rethrown from this block and the transaction will be aborted. */ @PublisherDSL -interface TransactionalScope : PublishScope { +public interface TransactionalScope : PublishScope { /** * Create and run a [transaction], which can [PublishScope.offer] and [PublishScope.publish] records to Kafka. @@ -115,5 +116,5 @@ interface TransactionalScope : PublishScope { * } * ``` */ - suspend fun transaction(block: suspend PublishScope.() -> A): A + public suspend fun transaction(block: suspend PublishScope.() -> A): A } diff --git a/src/main/kotlin/io/github/nomisRev/kafka/publisher/PublisherSettings.kt b/src/main/kotlin/io/github/nomisRev/kafka/publisher/PublisherSettings.kt index 64a362e8..1cce5ba7 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/publisher/PublisherSettings.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/publisher/PublisherSettings.kt @@ -27,7 +27,7 @@ import kotlin.time.Duration * @param producerListener listener that is called whenever a [Producer] is added, and removed. * @see http://kafka.apache.org/documentation.html#producerconfigs */ -data class PublisherSettings( +public data class PublisherSettings( val bootstrapServers: String, val keySerializer: Serializer, val valueSerializer: Serializer, @@ -55,7 +55,7 @@ data class PublisherSettings( putAll(properties) } - fun transactionalId(): String? = + public fun transactionalId(): String? = properties[ProducerConfig.TRANSACTIONAL_ID_CONFIG] as? String /** @@ -64,24 +64,24 @@ data class PublisherSettings( * [Producer.initTransactions] is invoked on the producer to initialize * transactions before any operations are performed on the publisher. */ - fun isTransactional(): Boolean = + public fun isTransactional(): Boolean = !transactionalId().isNullOrBlank() /** Called whenever a [Producer] is added or removed. */ - interface ProducerListener { + public interface ProducerListener { /** * A new producer was created. * @param id the producer id (factory name and client.id separated by a period). * @param producer the producer. */ - fun producerAdded(id: String, producer: Producer<*, *>) {} + public fun producerAdded(id: String, producer: Producer<*, *>) {} /** * An existing producer was removed. * @param id the producer id (factory bean name and client.id separated by a period). * @param producer the producer. */ - fun producerRemoved(id: String, producer: Producer<*, *>) {} + public fun producerRemoved(id: String, producer: Producer<*, *>) {} } } diff --git a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt b/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt index ac2c2c5d..f2a246b8 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt @@ -1,5 +1,6 @@ package io.github.nomisRev.kafka.receiver.internals +import io.github.nomisRev.kafka.receiver.CommitStrategy import io.github.nomisRev.kafka.receiver.Offset import io.github.nomisRev.kafka.receiver.ReceiverSettings import io.github.nomisRev.kafka.receiver.internals.AckMode.ATMOST_ONCE @@ -19,6 +20,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart.LAZY import kotlinx.coroutines.CoroutineStart.UNDISPATCHED import kotlinx.coroutines.Dispatchers.Default +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.Channel @@ -32,6 +34,8 @@ import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.handleCoroutineException import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.onTimeout +import kotlinx.coroutines.selects.whileSelect import kotlinx.coroutines.withContext import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.TopicPartition @@ -72,23 +76,9 @@ internal class EventLoop( private val channel = Channel>() private val pollTimeout = settings.pollTimeout.toJavaDuration() private val pausedPartitionsByUser: MutableSet = HashSet() - private val reachedMaxCommitBatchSize = Channel(Channel.RENDEZVOUS) + private val commitBatchSignal = Channel(Channel.RENDEZVOUS) private val utmostOnceOffsets = UtmostOnceOffsets() - /* Takes care of scheduling our commits to Kafka. - * It will schedule a commit after `reachedMaxCommitBatchSize` channel signals it has reach the batch size, - * or when it times out after the given `commitInterval`. - * This way it optimises sending commits to Kafka in an optimised way. - * Either every `Duration` or `x` elements, whichever comes first. */ - private val commitManager = scope.launch(context = Default, start = LAZY) { - offsetCommitWorker( - ackMode = ackMode, - strategy = settings.commitStrategy, - commitSignal = reachedMaxCommitBatchSize, - commit = ::scheduleCommitIfRequired - ) - } - internal fun receive(): Flow> = channel.consumeAsFlow() .onStart { @@ -96,7 +86,7 @@ internal class EventLoop( withContext(scope.coroutineContext) { poll() } commitManager.start() }.onCompletion { - reachedMaxCommitBatchSize.close() + commitBatchSignal.close() withContext(scope.coroutineContext) { commitManager.cancelAndJoin() consumer.wakeup() @@ -485,7 +475,7 @@ internal class EventLoop( TopicPartition(record.topic(), record.partition()), record.offset(), settings.commitStrategy.size(), - reachedMaxCommitBatchSize + commitBatchSignal ) private inner class CommitOffset( @@ -527,6 +517,46 @@ internal class EventLoop( handleCoroutineException(outerContext, e) } } + + /** + * A suspend function that will schedule [commit] based on the [AckMode], [CommitStrategy] and [commitBatchSignal]. + * Run in a parallel Job alongside the [EventLoop], + * this way we have a separate process managing, and scheduling the commits. + * + * KotlinX Coroutines exposes a powerful experimental API where we can listen to our [commitBatchSignal], + * while racing against a [onTimeout]. This allows for easily committing on whichever event arrives first. + */ + @OptIn(ExperimentalCoroutinesApi::class) + private val commitManager = scope.launch(start = LAZY) { + if (ackMode == MANUAL_ACK || ackMode == AUTO_ACK) { + whileSelect { + when (settings.commitStrategy) { + is CommitStrategy.BySizeOrTime -> { + commitBatchSignal.onReceiveCatching { + scheduleCommitIfRequired() + !it.isClosed + } + onTimeout(settings.commitStrategy.interval) { + scheduleCommitIfRequired() + true + } + } + + is CommitStrategy.BySize -> + commitBatchSignal.onReceiveCatching { + scheduleCommitIfRequired() + !it.isClosed + } + + is CommitStrategy.ByTime -> + onTimeout(settings.commitStrategy.interval) { + scheduleCommitIfRequired() + true + } + } + } + } + } } /* Marker for functions that are only called from the KafkaConsumer thread */ diff --git a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/OffsetCommitWorker.kt b/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/OffsetCommitWorker.kt deleted file mode 100644 index 11b87f3e..00000000 --- a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/OffsetCommitWorker.kt +++ /dev/null @@ -1,56 +0,0 @@ -package io.github.nomisRev.kafka.receiver.internals - -import io.github.nomisRev.kafka.receiver.CommitStrategy -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.ChannelResult -import kotlinx.coroutines.channels.onFailure -import kotlinx.coroutines.channels.onSuccess -import kotlinx.coroutines.selects.onTimeout -import kotlinx.coroutines.selects.whileSelect - -/** - * A suspend function that will schedule [commit] based on the [AckMode], [CommitStrategy] and [commitSignal]. - * This function should be run in a parallel-process alongside the [EventLoop], - * this way we have a separate process managing and scheduling the commits. - * - * KotlinX Coroutines exposes a powerful experimental API where we can listen to our [commitSignal], - * while racing against a [onTimeout]. This allows for easily committing on whichever event arrives first. - * - * If you send a [Unit] to the [commitSignal] it will commit immediately, - * if you send a [null] it will reset the timer - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal suspend fun offsetCommitWorker( - ackMode: AckMode, - strategy: CommitStrategy, - commitSignal: Channel, - commit: suspend () -> Unit, -): Unit = if (ackMode == AckMode.MANUAL_ACK || ackMode == AckMode.AUTO_ACK) { - whileSelect { - when (strategy) { - is CommitStrategy.BySizeOrTime -> { - commitSignal.onReceiveCatching { - commit() - !it.isClosed - } - onTimeout(strategy.interval) { - commit() - true - } - } - - is CommitStrategy.BySize -> - commitSignal.onReceiveCatching { - commit() - !it.isClosed - } - - is CommitStrategy.ByTime -> - onTimeout(strategy.interval) { - commit() - true - } - } - } -} else Unit diff --git a/src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt b/src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt index 963605ee..51306761 100644 --- a/src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt +++ b/src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt @@ -47,7 +47,6 @@ import java.util.concurrent.Executors import java.util.concurrent.Future import java.util.concurrent.TimeUnit import kotlin.test.assertEquals -import kotlin.time.Duration.Companion.INFINITE import kotlin.time.Duration.Companion.seconds @@ -169,7 +168,7 @@ abstract class KafkaSpec { partitions: Int = 4, replicationFactor: Short = 1, test: suspend TopicTestScope.(NewTopic) -> Unit - ): Unit = runTest(timeout = INFINITE) { + ): Unit = runTest { val topic = NewTopic(nextTopicName(), partitions, replicationFactor).configs(topicConfig) admin { createTopic(topic)