Skip to content

Commit

Permalink
Merge pull request #179 from nomisRev/explicit-api
Browse files Browse the repository at this point in the history
Explicit api, and flatten commit offset worker
  • Loading branch information
nomisRev authored Feb 5, 2024
2 parents 3e2af6f + aa7b578 commit 76ee264
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 109 deletions.
20 changes: 12 additions & 8 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import com.bnorm.power.PowerAssertGradleExtension
import kotlinx.knit.KnitPluginExtension
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.*
import org.gradle.api.tasks.testing.logging.TestLogEvent
import org.gradle.api.tasks.testing.logging.TestLogEvent.FAILED
import org.gradle.api.tasks.testing.logging.TestLogEvent.PASSED
import org.gradle.api.tasks.testing.logging.TestLogEvent.SKIPPED
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_ERROR
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_OUT
import org.jetbrains.dokka.gradle.DokkaTask

plugins {
Expand Down Expand Up @@ -46,6 +52,10 @@ configure<JavaPluginExtension> {
}
}

kotlin {
explicitApi()
}

tasks {
withType<DokkaTask>().configureEach {
outputDirectory.set(rootDir.resolve("docs"))
Expand All @@ -71,14 +81,8 @@ tasks {
withType<Test>().configureEach {
useJUnitPlatform()
testLogging {
exceptionFormat = TestExceptionFormat.FULL
events = setOf(
TestLogEvent.PASSED,
TestLogEvent.SKIPPED,
TestLogEvent.FAILED,
TestLogEvent.STANDARD_OUT,
TestLogEvent.STANDARD_ERROR
)
exceptionFormat = FULL
events = setOf(SKIPPED, FAILED, STANDARD_ERROR)
}
}
}
1 change: 1 addition & 0 deletions guide/example/example-publisher-01.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fun main() = SuspendApp {
KafkaPublisher(settings).use { publisher ->
// ... use the publisher
val m: Map<MetricName, Metric> = publisher.metrics()
println(m)

publisher.publishScope {
// send record without awaiting acknowledgement
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public fun <K, V> List<ConsumerRecords<K, V>>.offsets(
"io.github.nomisRev.kafka.receiver.KafkaReceiver"
)
)
@OptIn(FlowPreview::class)
@OptIn(ExperimentalCoroutinesApi::class)
public fun <K, V> Flow<KafkaConsumer<K, V>>.subscribeTo(
name: String,
dispatcher: CoroutineDispatcher = IO,
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/io/github/nomisRev/kafka/Producer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public fun <K, V> kafkaProducer(
"Use io.github.nomisRev.kafka.publisher.Acks instead",
ReplaceWith("this", "io.github.nomisRev.kafka.publisher.Acks")
)
typealias Acks =
public typealias Acks =
io.github.nomisRev.kafka.publisher.Acks

@Deprecated("""
Expand Down Expand Up @@ -122,7 +122,7 @@ public data class ProducerSettings<K, V>(
other?.let { putAll(other) }
}

fun toPublisherSettings(): PublisherSettings<K, V> =
public fun toPublisherSettings(): PublisherSettings<K, V> =
PublisherSettings(
bootstrapServers,
keyDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ import kotlin.time.toJavaDuration
* This only occurs when a fatal error occurred, and Flow transitions to COMPLETE.
* @param createProducer a callback to create a producer, by default it uses the one from [PublisherSettings].
*/
fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produce(
public fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produce(
settings: PublisherSettings<Key, Value>,
onPublisherRecordDropped: (suspend (Logger, ProducerRecord<Key, Value>) -> Unit)? = null,
createProducer: (suspend (PublisherSettings<Key, Value>) -> Producer<Key, Value>)? = null
Expand All @@ -88,7 +88,7 @@ fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produce(
* This means we don't have to wait for the acknowledgement before sending the next message,
* resulting in maximum throughput but still guarantees that the message was sent to Kafka.
*/
fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produceOrThrow(
public fun <Key, Value> Flow<ProducerRecord<Key, Value>>.produceOrThrow(
settings: PublisherSettings<Key, Value>,
onPublisherRecordDropped: (suspend (Logger, ProducerRecord<Key, Value>) -> Unit)? = null,
createProducer: (suspend (PublisherSettings<Key, Value>) -> Producer<Key, Value>)? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import kotlin.time.toJavaDuration
* KafkaPublisher(settings).use { publisher ->
* // ... use the publisher
* val m: Map<MetricName, Metric> = publisher.metrics()
* println(m)
*
* publisher.publishScope {
* // send record without awaiting acknowledgement
Expand All @@ -77,7 +78,7 @@ import kotlin.time.toJavaDuration
* ```
* <!--- KNIT example-publisher-01.kt -->
*/
fun <Key, Value> KafkaPublisher(
public fun <Key, Value> KafkaPublisher(
settings: PublisherSettings<Key, Value>,
createProducer: (suspend (PublisherSettings<Key, Value>) -> Producer<Key, Value>)? = null
): KafkaPublisher<Key, Value> =
Expand All @@ -88,7 +89,7 @@ fun <Key, Value> KafkaPublisher(
* It has 1 main method, [publishScope] which creates a [PublishScope],
* and two suspending methods from the [Producer] [partitionsFor], and [metrics].
*/
interface KafkaPublisher<Key, Value> : AutoCloseable {
public interface KafkaPublisher<Key, Value> : AutoCloseable {

/**
* Create and run a [publishScope], which can [PublishScope.offer] and [PublishScope.publish] records to Kafka.
Expand Down Expand Up @@ -125,13 +126,13 @@ interface KafkaPublisher<Key, Value> : AutoCloseable {
* }
* ```
*/
suspend fun <A> publishScope(block: suspend TransactionalScope<Key, Value>.() -> A): A
public suspend fun <A> publishScope(block: suspend TransactionalScope<Key, Value>.() -> A): A

/** @see KafkaProducer.partitionsFor */
suspend fun partitionsFor(topic: String): List<PartitionInfo>
public suspend fun partitionsFor(topic: String): List<PartitionInfo>

/** @see KafkaProducer.metrics */
suspend fun metrics(): Map<MetricName, Metric>
public suspend fun metrics(): Map<MetricName, Metric>
}

private class DefaultKafkaPublisher<Key, Value>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.errors.ProducerFencedException

@DslMarker
annotation class PublisherDSL
public annotation class PublisherDSL

/**
* The DSL, or receiver type, of [KafkaPublisher.publishScope] and [TransactionalScope.transaction].
Expand All @@ -24,7 +24,7 @@ annotation class PublisherDSL
* - [publish], this gives you less throughput, but waits on the delivery of the messages.
*/
@PublisherDSL
interface PublishScope<Key, Value> : CoroutineScope {
public interface PublishScope<Key, Value> : CoroutineScope {

/**
* Offer the [record] to Kafka, and immediately return.
Expand All @@ -33,7 +33,7 @@ interface PublishScope<Key, Value> : CoroutineScope {
*
* @param record to be offered to kafka
*/
suspend fun offer(record: ProducerRecord<Key, Value>)
public suspend fun offer(record: ProducerRecord<Key, Value>)

/**
* Publisher a [record] to Kafka, and suspends until acknowledged by kafka.
Expand All @@ -44,31 +44,32 @@ interface PublishScope<Key, Value> : CoroutineScope {
*
* @param record to be delivered to kafka
*/
suspend fun publish(record: ProducerRecord<Key, Value>): RecordMetadata
public suspend fun publish(record: ProducerRecord<Key, Value>): RecordMetadata

/**
* Same as [offer], but for an [Iterable]] of [ProducerRecord].
* @see offer
*/
suspend fun offer(records: Iterable<ProducerRecord<Key, Value>>) =
public suspend fun offer(records: Iterable<ProducerRecord<Key, Value>>) {
records.map { offer(it) }
}

/**
* Same as [publish], but for an [Iterable]] of [ProducerRecord].
* @see publish
*/
suspend fun publish(record: Iterable<ProducerRecord<Key, Value>>): List<RecordMetadata> = coroutineScope {
public suspend fun publish(record: Iterable<ProducerRecord<Key, Value>>): List<RecordMetadata> = coroutineScope {
record.map { async { publish(it) } }.awaitAll()
}

/** Alias for `runCatching`, and `publish` except rethrows fatal exceptions */
suspend fun publishCatching(record: ProducerRecord<Key, Value>): Result<RecordMetadata>
public suspend fun publishCatching(record: ProducerRecord<Key, Value>): Result<RecordMetadata>

/**
* Catch first failure of [publish], except fatal exceptions.
* Alias for `runCatching`, `publish`, and `awaitAll`, except rethrows fatal exceptions
*/
suspend fun publishCatching(record: Iterable<ProducerRecord<Key, Value>>): Result<List<RecordMetadata>>
public suspend fun publishCatching(record: Iterable<ProducerRecord<Key, Value>>): Result<List<RecordMetadata>>
}

/**
Expand All @@ -89,7 +90,7 @@ interface PublishScope<Key, Value> : CoroutineScope {
* then it'll be rethrown from this block and the transaction will be aborted.
*/
@PublisherDSL
interface TransactionalScope<Key, Value> : PublishScope<Key, Value> {
public interface TransactionalScope<Key, Value> : PublishScope<Key, Value> {

/**
* Create and run a [transaction], which can [PublishScope.offer] and [PublishScope.publish] records to Kafka.
Expand All @@ -115,5 +116,5 @@ interface TransactionalScope<Key, Value> : PublishScope<Key, Value> {
* }
* ```
*/
suspend fun <A> transaction(block: suspend PublishScope<Key, Value>.() -> A): A
public suspend fun <A> transaction(block: suspend PublishScope<Key, Value>.() -> A): A
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kotlin.time.Duration
* @param producerListener listener that is called whenever a [Producer] is added, and removed.
* @see http://kafka.apache.org/documentation.html#producerconfigs
*/
data class PublisherSettings<Key, Value>(
public data class PublisherSettings<Key, Value>(
val bootstrapServers: String,
val keySerializer: Serializer<Key>,
val valueSerializer: Serializer<Value>,
Expand Down Expand Up @@ -55,7 +55,7 @@ data class PublisherSettings<Key, Value>(
putAll(properties)
}

fun transactionalId(): String? =
public fun transactionalId(): String? =
properties[ProducerConfig.TRANSACTIONAL_ID_CONFIG] as? String

/**
Expand All @@ -64,24 +64,24 @@ data class PublisherSettings<Key, Value>(
* [Producer.initTransactions] is invoked on the producer to initialize
* transactions before any operations are performed on the publisher.
*/
fun isTransactional(): Boolean =
public fun isTransactional(): Boolean =
!transactionalId().isNullOrBlank()

/** Called whenever a [Producer] is added or removed. */
interface ProducerListener {
public interface ProducerListener {
/**
* A new producer was created.
* @param id the producer id (factory name and client.id separated by a period).
* @param producer the producer.
*/
fun producerAdded(id: String, producer: Producer<*, *>) {}
public fun producerAdded(id: String, producer: Producer<*, *>) {}

/**
* An existing producer was removed.
* @param id the producer id (factory bean name and client.id separated by a period).
* @param producer the producer.
*/
fun producerRemoved(id: String, producer: Producer<*, *>) {}
public fun producerRemoved(id: String, producer: Producer<*, *>) {}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.nomisRev.kafka.receiver.internals

import io.github.nomisRev.kafka.receiver.CommitStrategy
import io.github.nomisRev.kafka.receiver.Offset
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import io.github.nomisRev.kafka.receiver.internals.AckMode.ATMOST_ONCE
Expand All @@ -19,6 +20,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.LAZY
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.Dispatchers.Default
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
Expand All @@ -32,6 +34,8 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.handleCoroutineException
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.whileSelect
import kotlinx.coroutines.withContext
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -72,31 +76,17 @@ internal class EventLoop<K, V>(
private val channel = Channel<ConsumerRecords<K, V>>()
private val pollTimeout = settings.pollTimeout.toJavaDuration()
private val pausedPartitionsByUser: MutableSet<TopicPartition> = HashSet()
private val reachedMaxCommitBatchSize = Channel<Unit>(Channel.RENDEZVOUS)
private val commitBatchSignal = Channel<Unit>(Channel.RENDEZVOUS)
private val utmostOnceOffsets = UtmostOnceOffsets()

/* Takes care of scheduling our commits to Kafka.
* It will schedule a commit after `reachedMaxCommitBatchSize` channel signals it has reach the batch size,
* or when it times out after the given `commitInterval`.
* This way it optimises sending commits to Kafka in an optimised way.
* Either every `Duration` or `x` elements, whichever comes first. */
private val commitManager = scope.launch(context = Default, start = LAZY) {
offsetCommitWorker(
ackMode = ackMode,
strategy = settings.commitStrategy,
commitSignal = reachedMaxCommitBatchSize,
commit = ::scheduleCommitIfRequired
)
}

internal fun receive(): Flow<ConsumerRecords<K, V>> =
channel.consumeAsFlow()
.onStart {
if (topicNames.isNotEmpty()) subscribe(topicNames)
withContext(scope.coroutineContext) { poll() }
commitManager.start()
}.onCompletion {
reachedMaxCommitBatchSize.close()
commitBatchSignal.close()
withContext(scope.coroutineContext) {
commitManager.cancelAndJoin()
consumer.wakeup()
Expand Down Expand Up @@ -485,7 +475,7 @@ internal class EventLoop<K, V>(
TopicPartition(record.topic(), record.partition()),
record.offset(),
settings.commitStrategy.size(),
reachedMaxCommitBatchSize
commitBatchSignal
)

private inner class CommitOffset(
Expand Down Expand Up @@ -527,6 +517,46 @@ internal class EventLoop<K, V>(
handleCoroutineException(outerContext, e)
}
}

/**
* A suspend function that will schedule [commit] based on the [AckMode], [CommitStrategy] and [commitBatchSignal].
* Run in a parallel Job alongside the [EventLoop],
* this way we have a separate process managing, and scheduling the commits.
*
* KotlinX Coroutines exposes a powerful experimental API where we can listen to our [commitBatchSignal],
* while racing against a [onTimeout]. This allows for easily committing on whichever event arrives first.
*/
@OptIn(ExperimentalCoroutinesApi::class)
private val commitManager = scope.launch(start = LAZY) {
if (ackMode == MANUAL_ACK || ackMode == AUTO_ACK) {
whileSelect {
when (settings.commitStrategy) {
is CommitStrategy.BySizeOrTime -> {
commitBatchSignal.onReceiveCatching {
scheduleCommitIfRequired()
!it.isClosed
}
onTimeout(settings.commitStrategy.interval) {
scheduleCommitIfRequired()
true
}
}

is CommitStrategy.BySize ->
commitBatchSignal.onReceiveCatching {
scheduleCommitIfRequired()
!it.isClosed
}

is CommitStrategy.ByTime ->
onTimeout(settings.commitStrategy.interval) {
scheduleCommitIfRequired()
true
}
}
}
}
}
}

/* Marker for functions that are only called from the KafkaConsumer thread */
Expand Down
Loading

0 comments on commit 76ee264

Please sign in to comment.