Skip to content

Commit

Permalink
release 0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev committed Jul 2, 2022
1 parent f7598eb commit c0c8f23
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 51 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kotlin.code.style=official

# Package definitions
projects.group=io.github.nomisrev
projects.version=0.1.1-SNAPSHOT
projects.version=0.2.0

# Project definitions
pom.name=kotlin-kafka
Expand Down
3 changes: 1 addition & 2 deletions src/main/kotlin/io/github/nomisRev/kafka/KafkaFuture.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
@file:JvmName("KafkaFutureExt")

package io.github.nomisRev.kafka

import kotlinx.coroutines.Deferred
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.future.asDeferred
import kotlinx.coroutines.future.await
import org.apache.kafka.clients.admin.CreateTopicsResult
Expand Down Expand Up @@ -31,7 +31,6 @@ public suspend fun <T> KafkaFuture<T>.await(): T =
*
* The [KafkaFuture] is cancelled when the resulting deferred is cancelled.
*/
@OptIn(InternalCoroutinesApi::class)
@Suppress("DeferredIsResult")
public fun <T> KafkaFuture<T>.asDeferred(): Deferred<T> =
toCompletionStage().asDeferred()
95 changes: 47 additions & 48 deletions src/main/kotlin/io/github/nomisRev/kafka/Producer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ import org.apache.kafka.common.serialization.Serializer
*/
@FlowPreview
public suspend fun <A, B> Flow<ProducerRecord<A, B>>.produce(
settings: ProducerSettings<A, B>
settings: ProducerSettings<A, B>,
): Flow<RecordMetadata> =
settings.kafkaProducer().flatMapConcat { producer ->
kafkaProducer(settings).flatMapConcat { producer ->
this@produce.map { record -> producer.sendAwait(record) }
}

Expand All @@ -75,7 +75,7 @@ public suspend fun <A, B> Flow<ProducerRecord<A, B>>.produce(
* <!--- KNIT example-producer-02.kt -->
*/
public suspend fun <A, B> KafkaProducer<A, B>.sendAwait(
record: ProducerRecord<A, B>
record: ProducerRecord<A, B>,
): RecordMetadata =
suspendCoroutine { cont ->
// Those can be a SerializationException when it fails to serialize the message,
Expand All @@ -87,23 +87,49 @@ public suspend fun <A, B> KafkaProducer<A, B>.sendAwait(
}
}

public fun <K, V> KafkaProducer(
props: Properties,
keyDeserializer: Serializer<K>,
valueDeserializer: Serializer<V>
): KafkaProducer<K, V> =
KafkaProducer(props, keyDeserializer, valueDeserializer)
/**
* KafkaKafkaProducer for [K] - [V] which takes
*/
@Suppress("FunctionName")
public fun <K, V> KafkaProducer(setting: ProducerSettings<K, V>): KafkaProducer<K, V> =
KafkaProducer(setting.properties(), setting.keyDeserializer, setting.valueDeserializer)

/**
* Will automatically close, and flush when finished streaming.
* The [Flow] will close when the [KafkaProducer] is consumed from the [Flow].
*
* This means that the [KafkaProducer] will not be closed for a synchronous running stream, but
* when running the [Flow] is offloaded in a separate Coroutine it's prone to be collected, closed
* and flushed. In the example below we construct a producer stream that produces 100 indexed
* messages.
*
* ```kotlin
* fun <Key, Value> KafkaProducer<Key, Value>.produce(topicName: String, count: Int): Flow<Unit> =
* (0..count).asFlow().map { sendAwait(ProducerRecord(topicName, "message #it")) }
*
* val producerStream = kafkaProducer(Properties(), StringSerializer(), StringSerializer())
* .flatMapConcat { producer -> producer.produce("topic-name", 100) }
* ```
*
* Here the `KafkaProducer` will only get collected (and closed/flushed) when all 100 messages
* were produced.
*
* **DO NOT** If instead we'd do something like the following, where we offload in a buffer then
* the `KafkaProducer` gets collected into the buffer and thus closed/flushed.
*
* ```kotlin
* kafkaProducer(Properties(), StringSerializer(), StringSerializer()).buffer(10)
* ```
*/
public fun <K, V> kafkaProducer(
props: Properties,
keyDeserializer: Serializer<K>,
valueDeserializer: Serializer<V>
setting: ProducerSettings<K, V>,
): Flow<KafkaProducer<K, V>> = flow {
val producer = KafkaProducer(props, keyDeserializer, valueDeserializer)
try {
producer.use { emit(it) }
} finally {
producer.flush()
KafkaProducer(setting).use { producer ->
try {
emit(producer)
} finally {
producer.flush()
}
}
}

Expand All @@ -115,6 +141,9 @@ public enum class Acks(public val value: String) {
}

/**
* A type-safe constructor for [KafkaProducer] settings.
* It forces you to specify the bootstrapServer, and serializers for [K] and [V].
* These are the minimum requirements for constructing a valid [KafkaProducer].
*
* @see http://kafka.apache.org/documentation.html#producerconfigs
*/
Expand All @@ -123,7 +152,7 @@ public data class ProducerSettings<K, V>(
val keyDeserializer: Serializer<K>,
val valueDeserializer: Serializer<V>,
val acks: Acks = Acks.One,
val other: Properties? = null
val other: Properties? = null,
) {
public fun properties(): Properties =
Properties().apply {
Expand All @@ -133,34 +162,4 @@ public data class ProducerSettings<K, V>(
put(ProducerConfig.ACKS_CONFIG, acks.value)
other?.let { putAll(other) }
}

/**
* Will automatically close, and flush when finished streaming.
* The [Flow] will close when the [KafkaProducer] is consumed from the [Flow].
*
* This means that the [KafkaProducer] will not be closed for a synchronous running stream, but
* when running the [Flow] is offloaded in a separate Coroutine it's prone to be collected, closed
* and flushed. In the example below we construct a producer stream that produces 100 indexed
* messages.
*
* ```kotlin
* fun <Key, Value> KafkaProducer<Key, Value>.produce(topicName: String, count: Int): Flow<Unit> =
* (0..count).asFlow().map { sendAwait(ProducerRecord(topicName, "message #it")) }
*
* val producerStream = kafkaProducer(Properties(), StringSerializer(), StringSerializer())
* .flatMapConcat { producer -> producer.produce("topic-name", 100) }
* ```
*
* Here the `KafkaProducer` will only get collected (and closed/flushed) when all 100 messages
* were produced.
*
* **DO NOT** If instead we'd do something like the following, where we offload in a buffer then
* the `KafkaProducer` gets collected into the buffer and thus closed/flushed.
*
* ```kotlin
* kafkaProducer(Properties(), StringSerializer(), StringSerializer()).buffer(10)
* ```
*/
public fun kafkaProducer(): Flow<KafkaProducer<K, V>> =
kafkaProducer(properties(), keyDeserializer, valueDeserializer)
}
43 changes: 43 additions & 0 deletions src/test/kotlin/io/github/nomisrev/kafka/ProducerSettingSpec.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.github.nomisrev.kafka

import io.github.nomisRev.kafka.Acks
import io.github.nomisRev.kafka.ProducerSettings
import io.kotest.assertions.assertSoftly
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.maps.shouldContainAll
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.enum
import io.kotest.property.arbitrary.map
import io.kotest.property.arbitrary.string
import io.kotest.property.checkAll
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties

class ProducerSettingSpec : StringSpec({

"ProducerSettings Ack" {
checkAll(
Arb.string(),
Arb.enum<Acks>(),
Arb.map(Arb.string(), Arb.string())
) { bootstrapServers, acks, map ->
val settings = ProducerSettings<String, String>(
bootstrapServers,
StringSerializer(),
StringSerializer(),
acks = acks,
other = Properties().apply {
putAll(map)
}
)

assertSoftly(settings.properties()) {
toMap().shouldContainAll(map as Map<Any, Any>)
getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) shouldBe bootstrapServers
getProperty(ProducerConfig.ACKS_CONFIG) shouldBe acks.value
}
}
}
})

0 comments on commit c0c8f23

Please sign in to comment.