Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit api, and flatten commit offset worker #179

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import com.bnorm.power.PowerAssertGradleExtension
import kotlinx.knit.KnitPluginExtension
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.*
import org.gradle.api.tasks.testing.logging.TestLogEvent
import org.gradle.api.tasks.testing.logging.TestLogEvent.FAILED
import org.gradle.api.tasks.testing.logging.TestLogEvent.PASSED
import org.gradle.api.tasks.testing.logging.TestLogEvent.SKIPPED
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_ERROR
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_OUT
import org.jetbrains.dokka.gradle.DokkaTask

plugins {
Expand Down Expand Up @@ -46,6 +52,10 @@ configure<JavaPluginExtension> {
}
}

kotlin {
explicitApi()
}

tasks {
withType<DokkaTask>().configureEach {
outputDirectory.set(rootDir.resolve("docs"))
Expand All @@ -71,14 +81,8 @@ tasks {
withType<Test>().configureEach {
useJUnitPlatform()
testLogging {
exceptionFormat = TestExceptionFormat.FULL
events = setOf(
TestLogEvent.PASSED,
TestLogEvent.SKIPPED,
TestLogEvent.FAILED,
TestLogEvent.STANDARD_OUT,
TestLogEvent.STANDARD_ERROR
)
exceptionFormat = FULL
events = setOf(SKIPPED, FAILED, STANDARD_ERROR)
}
}
}
1 change: 1 addition & 0 deletions guide/example/example-publisher-01.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fun main() = SuspendApp {
KafkaPublisher(settings).use { publisher ->
// ... use the publisher
val m: Map<MetricName, Metric> = publisher.metrics()
println(m)

publisher.publishScope {
// send record without awaiting acknowledgement
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public fun <K, V> List<ConsumerRecords<K, V>>.offsets(
"io.github.nomisRev.kafka.receiver.KafkaReceiver"
)
)
@OptIn(FlowPreview::class)
@OptIn(ExperimentalCoroutinesApi::class)
public fun <K, V> Flow<KafkaConsumer<K, V>>.subscribeTo(
name: String,
dispatcher: CoroutineDispatcher = IO,
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/io/github/nomisRev/kafka/Producer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public fun <K, V> kafkaProducer(
"Use io.github.nomisRev.kafka.publisher.Acks instead",
ReplaceWith("this", "io.github.nomisRev.kafka.publisher.Acks")
)
typealias Acks =
public typealias Acks =
io.github.nomisRev.kafka.publisher.Acks

@Deprecated("""
Expand Down Expand Up @@ -122,7 +122,7 @@ public data class ProducerSettings<K, V>(
other?.let { putAll(other) }
}

fun toPublisherSettings(): PublisherSettings<K, V> =
public fun toPublisherSettings(): PublisherSettings<K, V> =
PublisherSettings(
bootstrapServers,
keyDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ import kotlin.time.toJavaDuration
* This only occurs when a fatal error occurred, and Flow transitions to COMPLETE.
* @param createProducer a callback to create a producer, by default it uses the one from [PublisherSettings].
*/
fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produce(
public fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produce(
settings: PublisherSettings<Key, Value>,
onPublisherRecordDropped: (suspend (Logger, ProducerRecord<Key, Value>) -> Unit)? = null,
createProducer: (suspend (PublisherSettings<Key, Value>) -> Producer<Key, Value>)? = null
Expand All @@ -88,7 +88,7 @@ fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produce(
* This means we don't have to wait for the acknowledgement before sending the next message,
* resulting in maximum throughput but still guarantees that the message was sent to Kafka.
*/
fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produceOrThrow(
public fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produceOrThrow(
settings: PublisherSettings<Key, Value>,
onPublisherRecordDropped: (suspend (Logger, ProducerRecord<Key, Value>) -> Unit)? = null,
createProducer: (suspend (PublisherSettings<Key, Value>) -> Producer<Key, Value>)? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import kotlin.time.toJavaDuration
* KafkaPublisher(settings).use { publisher ->
* // ... use the publisher
* val m: Map<MetricName, Metric> = publisher.metrics()
* println(m)
*
* publisher.publishScope {
* // send record without awaiting acknowledgement
Expand All @@ -77,7 +78,7 @@ import kotlin.time.toJavaDuration
* ```
* <!--- KNIT example-publisher-01.kt -->
*/
fun <Key, Value> KafkaPublisher(
public fun <Key, Value> KafkaPublisher(
settings: PublisherSettings<Key, Value>,
createProducer: (suspend (PublisherSettings<Key, Value>) -> Producer<Key, Value>)? = null
): KafkaPublisher<Key, Value> =
Expand All @@ -88,7 +89,7 @@ fun <Key, Value> KafkaPublisher(
* It has 1 main method, [publishScope] which creates a [PublishScope],
* and two suspending methods from the [Producer] [partitionsFor], and [metrics].
*/
interface KafkaPublisher<Key, Value> : AutoCloseable {
public interface KafkaPublisher<Key, Value> : AutoCloseable {

/**
* Create and run a [publishScope], which can [PublishScope.offer] and [PublishScope.publish] records to Kafka.
Expand Down Expand Up @@ -125,13 +126,13 @@ interface KafkaPublisher<Key, Value> : AutoCloseable {
* }
* ```
*/
suspend fun <A> publishScope(block: suspend TransactionalScope<Key, Value>.() -> A): A
public suspend fun <A> publishScope(block: suspend TransactionalScope<Key, Value>.() -> A): A

/** @see KafkaProducer.partitionsFor */
suspend fun partitionsFor(topic: String): List<PartitionInfo>
public suspend fun partitionsFor(topic: String): List<PartitionInfo>

/** @see KafkaProducer.metrics */
suspend fun metrics(): Map<MetricName, Metric>
public suspend fun metrics(): Map<MetricName, Metric>
}

private class DefaultKafkaPublisher<Key, Value>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.errors.ProducerFencedException

@DslMarker
annotation class PublisherDSL
public annotation class PublisherDSL

/**
* The DSL, or receiver type, of [KafkaPublisher.publishScope] and [TransactionalScope.transaction].
Expand All @@ -24,7 +24,7 @@ annotation class PublisherDSL
* - [publish], this gives you less throughput, but waits on the delivery of the messages.
*/
@PublisherDSL
interface PublishScope<Key, Value> : CoroutineScope {
public interface PublishScope<Key, Value> : CoroutineScope {

/**
* Offer the [record] to Kafka, and immediately return.
Expand All @@ -33,7 +33,7 @@ interface PublishScope<Key, Value> : CoroutineScope {
*
* @param record to be offered to kafka
*/
suspend fun offer(record: ProducerRecord<Key, Value>)
public suspend fun offer(record: ProducerRecord<Key, Value>)

/**
* Publisher a [record] to Kafka, and suspends until acknowledged by kafka.
Expand All @@ -44,31 +44,32 @@ interface PublishScope<Key, Value> : CoroutineScope {
*
* @param record to be delivered to kafka
*/
suspend fun publish(record: ProducerRecord<Key, Value>): RecordMetadata
public suspend fun publish(record: ProducerRecord<Key, Value>): RecordMetadata

/**
* Same as [offer], but for an [Iterable]] of [ProducerRecord].
* @see offer
*/
suspend fun offer(records: Iterable<ProducerRecord<Key, Value>>) =
public suspend fun offer(records: Iterable<ProducerRecord<Key, Value>>) {
records.map { offer(it) }
}

/**
* Same as [publish], but for an [Iterable]] of [ProducerRecord].
* @see publish
*/
suspend fun publish(record: Iterable<ProducerRecord<Key, Value>>): List<RecordMetadata> = coroutineScope {
public suspend fun publish(record: Iterable<ProducerRecord<Key, Value>>): List<RecordMetadata> = coroutineScope {
record.map { async { publish(it) } }.awaitAll()
}

/** Alias for `runCatching`, and `publish` except rethrows fatal exceptions */
suspend fun publishCatching(record: ProducerRecord<Key, Value>): Result<RecordMetadata>
public suspend fun publishCatching(record: ProducerRecord<Key, Value>): Result<RecordMetadata>

/**
* Catch first failure of [publish], except fatal exceptions.
* Alias for `runCatching`, `publish`, and `awaitAll`, except rethrows fatal exceptions
*/
suspend fun publishCatching(record: Iterable<ProducerRecord<Key, Value>>): Result<List<RecordMetadata>>
public suspend fun publishCatching(record: Iterable<ProducerRecord<Key, Value>>): Result<List<RecordMetadata>>
}

/**
Expand All @@ -89,7 +90,7 @@ interface PublishScope<Key, Value> : CoroutineScope {
* then it'll be rethrown from this block and the transaction will be aborted.
*/
@PublisherDSL
interface TransactionalScope<Key, Value> : PublishScope<Key, Value> {
public interface TransactionalScope<Key, Value> : PublishScope<Key, Value> {

/**
* Create and run a [transaction], which can [PublishScope.offer] and [PublishScope.publish] records to Kafka.
Expand All @@ -115,5 +116,5 @@ interface TransactionalScope<Key, Value> : PublishScope<Key, Value> {
* }
* ```
*/
suspend fun <A> transaction(block: suspend PublishScope<Key, Value>.() -> A): A
public suspend fun <A> transaction(block: suspend PublishScope<Key, Value>.() -> A): A
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kotlin.time.Duration
* @param producerListener listener that is called whenever a [Producer] is added, and removed.
* @see http://kafka.apache.org/documentation.html#producerconfigs
*/
data class PublisherSettings<Key, Value>(
public data class PublisherSettings<Key, Value>(
val bootstrapServers: String,
val keySerializer: Serializer<Key>,
val valueSerializer: Serializer<Value>,
Expand Down Expand Up @@ -55,7 +55,7 @@ data class PublisherSettings<Key, Value>(
putAll(properties)
}

fun transactionalId(): String? =
public fun transactionalId(): String? =
properties[ProducerConfig.TRANSACTIONAL_ID_CONFIG] as? String

/**
Expand All @@ -64,24 +64,24 @@ data class PublisherSettings<Key, Value>(
* [Producer.initTransactions] is invoked on the producer to initialize
* transactions before any operations are performed on the publisher.
*/
fun isTransactional(): Boolean =
public fun isTransactional(): Boolean =
!transactionalId().isNullOrBlank()

/** Called whenever a [Producer] is added or removed. */
interface ProducerListener {
public interface ProducerListener {
/**
* A new producer was created.
* @param id the producer id (factory name and client.id separated by a period).
* @param producer the producer.
*/
fun producerAdded(id: String, producer: Producer<*, *>) {}
public fun producerAdded(id: String, producer: Producer<*, *>) {}

/**
* An existing producer was removed.
* @param id the producer id (factory bean name and client.id separated by a period).
* @param producer the producer.
*/
fun producerRemoved(id: String, producer: Producer<*, *>) {}
public fun producerRemoved(id: String, producer: Producer<*, *>) {}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.nomisRev.kafka.receiver.internals

import io.github.nomisRev.kafka.receiver.CommitStrategy
import io.github.nomisRev.kafka.receiver.Offset
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import io.github.nomisRev.kafka.receiver.internals.AckMode.ATMOST_ONCE
Expand All @@ -19,6 +20,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.LAZY
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.Dispatchers.Default
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
Expand All @@ -32,6 +34,8 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.handleCoroutineException
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.whileSelect
import kotlinx.coroutines.withContext
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -72,31 +76,17 @@ internal class EventLoop<K, V>(
private val channel = Channel<ConsumerRecords<K, V>>()
private val pollTimeout = settings.pollTimeout.toJavaDuration()
private val pausedPartitionsByUser: MutableSet<TopicPartition> = HashSet()
private val reachedMaxCommitBatchSize = Channel<Unit>(Channel.RENDEZVOUS)
private val commitBatchSignal = Channel<Unit>(Channel.RENDEZVOUS)
private val utmostOnceOffsets = UtmostOnceOffsets()

/* Takes care of scheduling our commits to Kafka.
* It will schedule a commit after `reachedMaxCommitBatchSize` channel signals it has reach the batch size,
* or when it times out after the given `commitInterval`.
* This way it optimises sending commits to Kafka in an optimised way.
* Either every `Duration` or `x` elements, whichever comes first. */
private val commitManager = scope.launch(context = Default, start = LAZY) {
offsetCommitWorker(
ackMode = ackMode,
strategy = settings.commitStrategy,
commitSignal = reachedMaxCommitBatchSize,
commit = ::scheduleCommitIfRequired
)
}

internal fun receive(): Flow<ConsumerRecords<K, V>> =
channel.consumeAsFlow()
.onStart {
if (topicNames.isNotEmpty()) subscribe(topicNames)
withContext(scope.coroutineContext) { poll() }
commitManager.start()
}.onCompletion {
reachedMaxCommitBatchSize.close()
commitBatchSignal.close()
withContext(scope.coroutineContext) {
commitManager.cancelAndJoin()
consumer.wakeup()
Expand Down Expand Up @@ -485,7 +475,7 @@ internal class EventLoop<K, V>(
TopicPartition(record.topic(), record.partition()),
record.offset(),
settings.commitStrategy.size(),
reachedMaxCommitBatchSize
commitBatchSignal
)

private inner class CommitOffset(
Expand Down Expand Up @@ -527,6 +517,46 @@ internal class EventLoop<K, V>(
handleCoroutineException(outerContext, e)
}
}

/**
* A suspend function that will schedule [commit] based on the [AckMode], [CommitStrategy] and [commitBatchSignal].
* Run in a parallel Job alongside the [EventLoop],
* this way we have a separate process managing, and scheduling the commits.
*
* KotlinX Coroutines exposes a powerful experimental API where we can listen to our [commitBatchSignal],
* while racing against a [onTimeout]. This allows for easily committing on whichever event arrives first.
*/
@OptIn(ExperimentalCoroutinesApi::class)
private val commitManager = scope.launch(start = LAZY) {
if (ackMode == MANUAL_ACK || ackMode == AUTO_ACK) {
whileSelect {
when (settings.commitStrategy) {
is CommitStrategy.BySizeOrTime -> {
commitBatchSignal.onReceiveCatching {
scheduleCommitIfRequired()
!it.isClosed
}
onTimeout(settings.commitStrategy.interval) {
scheduleCommitIfRequired()
true
}
}

is CommitStrategy.BySize ->
commitBatchSignal.onReceiveCatching {
scheduleCommitIfRequired()
!it.isClosed
}

is CommitStrategy.ByTime ->
onTimeout(settings.commitStrategy.interval) {
scheduleCommitIfRequired()
true
}
}
}
}
}
}

/* Marker for functions that are only called from the KafkaConsumer thread */
Expand Down
Loading
Loading