Skip to content

Commit

Permalink
add test for simulating the problem for continuous receive
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed May 17, 2024
1 parent c0a91d9 commit e858cca
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 9 deletions.
60 changes: 51 additions & 9 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import com.bnorm.power.PowerAssertGradleExtension
import kotlinx.knit.KnitPluginExtension
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.*
import org.gradle.api.tasks.testing.logging.TestLogEvent
import org.gradle.api.tasks.testing.logging.TestLogEvent.FAILED
import org.gradle.api.tasks.testing.logging.TestLogEvent.PASSED
import org.gradle.api.tasks.testing.logging.TestLogEvent.SKIPPED
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_ERROR
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_OUT
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL
import org.gradle.api.tasks.testing.logging.TestLogEvent.*
import org.jetbrains.dokka.gradle.DokkaTask

plugins {
Expand All @@ -17,10 +11,12 @@ plugins {
alias(libs.plugins.knit)
alias(libs.plugins.publish)
alias(libs.plugins.power.assert)
idea
}

repositories {
mavenCentral()
maven("https://oss.sonatype.org/content/repositories/snapshots")
}

group = "io.github.nomisrev"
Expand All @@ -34,8 +30,15 @@ dependencies {

testImplementation(kotlin("test"))
testImplementation(libs.testcontainers.kafka)
testImplementation(libs.slf4j.simple)
testImplementation(libs.kotlinx.coroutines.test)
testImplementation(libs.jackson.kotlin)
testImplementation(libs.jackson.databind)
testImplementation(libs.kotest.framework.api)
testImplementation(libs.kotest.runner.junit5)
testImplementation(libs.kotest.property)
testImplementation(libs.stove.testing)
testImplementation(libs.stove.testing.kafka)
testImplementation(libs.logback.classic)
}

configure<PowerAssertGradleExtension> {
Expand All @@ -52,10 +55,34 @@ configure<JavaPluginExtension> {
}
}

sourceSets {
@Suppress("LocalVariableName", "ktlint:standard:property-naming")
val `test-e2e` by creating {
compileClasspath += sourceSets.main.get().output
runtimeClasspath += sourceSets.main.get().output
}

val testE2eImplementation by configurations.getting {
extendsFrom(configurations.testImplementation.get())
}
configurations["testE2eRuntimeOnly"].extendsFrom(configurations.runtimeOnly.get())
}

idea {
module {
testSources.from(sourceSets["test-e2e"].allSource.sourceDirectories)
testResources.from(sourceSets["test-e2e"].resources.sourceDirectories)
isDownloadJavadoc = true
isDownloadSources = true
}
}

kotlin {
explicitApi()
jvmToolchain(17)
}


tasks {
withType<DokkaTask>().configureEach {
outputDirectory.set(rootDir.resolve("docs"))
Expand Down Expand Up @@ -88,5 +115,20 @@ tasks {
exceptionFormat = FULL
events = setOf(SKIPPED, FAILED, STANDARD_ERROR)
}
jvmArgs("--add-opens", "java.base/java.util=ALL-UNNAMED")
}

task<Test>("e2eTest") {
description = "Runs e2e tests."
group = "verification"
testClassesDirs = sourceSets["test-e2e"].output.classesDirs
classpath = sourceSets["test-e2e"].runtimeClasspath

useJUnitPlatform()
reports {
junitXml.required.set(true)
html.required.set(true)
}
jvmArgs("--add-opens", "java.base/java.util=ALL-UNNAMED")
}
}
14 changes: 14 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ slf4j = "2.0.12"
spotless="6.25.0"
publish="0.28.0"
power-assert="0.13.0"
stove = "1.0.0-SNAPSHOT"
jackson = "2.17.1"

[libraries]
kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest" }
Expand All @@ -26,6 +28,18 @@ testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "tes
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }

jackson-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
logback-classic = { module = "ch.qos.logback:logback-classic", version = "1.5.6" }

# Testing
stove-testing = { module = "com.trendyol:stove-testing-e2e", version.ref = "stove" }
stove-ktor-testing = { module = "com.trendyol:stove-ktor-testing-e2e", version.ref = "stove" }
stove-testing-kafka = { module = "com.trendyol:stove-testing-e2e-kafka", version.ref = "stove" }
kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" }
kotest-framework-api = { module = "io.kotest:kotest-framework-api", version.ref = "kotest" }


[plugins]
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
dokka = { id = "org.jetbrains.dokka", version.ref = "dokka" }
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ rootProject.name = "kotlin-kafka"
dependencyResolutionManagement {
repositories {
mavenCentral()
maven("https://oss.sonatype.org/content/repositories/snapshots")
}
}


include(":guide")
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.github.nomisRev.kafka.e2e.setup

import com.trendyol.stove.testing.e2e.system.abstractions.ApplicationUnderTest
import io.github.nomisRev.kafka.e2e.setup.example.KafkaTestShared
import io.github.nomisRev.kafka.e2e.setup.example.ReceiveMethod
import io.github.nomisRev.kafka.e2e.setup.example.StoveKafkaValueDeserializer
import io.github.nomisRev.kafka.e2e.setup.example.StoveKafkaValueSerializer
import io.github.nomisRev.kafka.publisher.KafkaPublisher
import io.github.nomisRev.kafka.publisher.PublisherSettings
import io.github.nomisRev.kafka.receiver.CommitStrategy
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.*
import kotlin.time.Duration.Companion.seconds

/**
* Stove's Kafka application under test implementation
*/
class KafkaApplicationUnderTest : ApplicationUnderTest<Unit> {
private lateinit var client: AdminClient
private val consumers: MutableList<AutoCloseable> = mutableListOf()

override suspend fun start(configurations: List<String>) {
val bootstrapServers = configurations.first { it.contains("kafka.servers", true) }.split('=')[1]
val interceptorClass = configurations.first { it.contains("kafka.interceptor-classes", true) }.split('=')[1]
val receiveMethod = configurations.first { it.contains("kafka.receive-method", true) }.split('=')[1]
client = createAdminClient(bootstrapServers)
createTopics(client)
startConsumers(bootstrapServers, interceptorClass, ReceiveMethod.from(receiveMethod))
}

private fun createAdminClient(bootstrapServers: String): AdminClient {
return mapOf<String, Any>(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers
).let { AdminClient.create(it) }
}

private fun createTopics(client: AdminClient) {
val newTopics = KafkaTestShared.topics.flatMap {
listOf(it.topic, it.retryTopic, it.deadLetterTopic)
}.map { NewTopic(it, 1, 1) }
client.createTopics(newTopics).all().get()
}

private fun startConsumers(bootStrapServers: String, interceptorClass: String, receiveMethod: ReceiveMethod) {
val (publisher, receiver) = createPublisherAndReceiver(interceptorClass, bootStrapServers)
val configuredConsumers = KafkaTestShared.consumers(receiver, publisher, receiveMethod)
configuredConsumers.forEach { it.start() }
consumers.addAll(configuredConsumers)
}

private fun createPublisherAndReceiver(
interceptorClass: String, bootStrapServers: String
): Pair<KafkaPublisher<String, Any>, KafkaReceiver<String, Any>> {
val consumerSettings = mapOf(
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to "2000",
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG to "true", // Expected to be created by the client
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(interceptorClass)
)

val receiverSettings = ReceiverSettings(bootstrapServers = bootStrapServers,
valueDeserializer = StoveKafkaValueDeserializer(),
keyDeserializer = StringDeserializer(),
groupId = "stove-application-consumers",
commitStrategy = CommitStrategy.ByTime(2.seconds),
pollTimeout = 1.seconds,
properties = Properties().apply {
putAll(consumerSettings)
})

val producerSettings = PublisherSettings<String, Any>(bootStrapServers,
StringSerializer(),
StoveKafkaValueSerializer(),
properties = Properties().apply {
put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, listOf(interceptorClass)
)
})

val publisher = KafkaPublisher(producerSettings)
val receiver = KafkaReceiver(receiverSettings)
return Pair(publisher, receiver)
}

override suspend fun stop() {
client.close()
consumers.forEach { it.close() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.github.nomisRev.kafka.e2e.setup

import com.trendyol.stove.testing.e2e.standalone.kafka.KafkaSystemOptions
import com.trendyol.stove.testing.e2e.standalone.kafka.kafka
import com.trendyol.stove.testing.e2e.system.TestSystem
import io.kotest.core.config.AbstractProjectConfig

class ProjectConfig : AbstractProjectConfig() {
override suspend fun beforeProject(): Unit = TestSystem()
.with {
kafka {
KafkaSystemOptions(
configureExposedConfiguration = { cfg ->
listOf(
"kafka.servers=${cfg.bootstrapServers}",
"kafka.interceptor-classes=${cfg.interceptorClass}",
"kafka.receive-method=kotlin-kafka" // here we can change to: 'kotlin-kafka' or 'traditional'
)
}
)
}
applicationUnderTest(KafkaApplicationUnderTest())
}.run()

override suspend fun afterProject(): Unit = TestSystem.stop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.github.nomisRev.kafka.e2e.setup.example

import io.github.nomisRev.kafka.e2e.setup.example.KafkaTestShared.TopicDefinition
import io.github.nomisRev.kafka.publisher.KafkaPublisher
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.flattenConcat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration

/**
* Supervisor that uses KafkaReceiver to retrieve messages from Kafka and handle them accordingly
*/
abstract class ConsumerSupervisor<K, V>(
private val receiver: KafkaReceiver<K, V>,
private val publisher: KafkaPublisher<K, V>,
private val receiveMethod: ReceiveMethod
) : AutoCloseable {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val logger = org.slf4j.LoggerFactory.getLogger(javaClass)

abstract val topicDefinition: TopicDefinition

/**
* Here we start the consumer
* We can use either KotlinKafka receiver or traditional while(true) loop to receive messages
* Traditional while(true) loop is successful in receiving messages continuously
* KotlinKafka receiver, continuously receives messages
*/
fun start() = when (receiveMethod) {
ReceiveMethod.KOTLIN_KAFKA_RECEIVE -> kotlinKafkaReceive()
ReceiveMethod.TRADITIONAL_RECEIVE -> traditionalReceive()
}

@OptIn(ExperimentalCoroutinesApi::class)
private fun kotlinKafkaReceive() {
scope.launch {
receiver.receiveAutoAck(
listOf(
topicDefinition.topic,
topicDefinition.retryTopic,
topicDefinition.deadLetterTopic
)
).flattenConcat()
.collect { message ->
logger.info("Message RECEIVED on the application side with KotlinKafka receiver: ${message.value()}")
received(message) // expected to receive the messages continuously?
}
}
}

private fun traditionalReceive() {
scope.launch {
receiver.withConsumer { consumer ->
consumer.subscribe(
listOf(
topicDefinition.topic,
topicDefinition.retryTopic,
topicDefinition.deadLetterTopic
)
)
while (isActive) {
val records = consumer.poll(Duration.ofMillis(500))
records.forEach { record ->
logger.info("Message RECEIVED on the application side with traditional while(true) loop: ${record.value()}")
received(record) {
consumer.commitAsync()
}
}
}
}
}
}


abstract suspend fun consume(record: ConsumerRecord<K, V>)

protected open suspend fun handleError(message: ConsumerRecord<K, V>, e: Exception) {
logger.error("Failed to process message: $message", e)
}

private suspend fun received(message: ConsumerRecord<K, V>, onSuccess: (ConsumerRecord<K, V>) -> Unit = { }) {
try {
consume(message)
onSuccess(message)
logger.info("Message COMMITTED on the application side: ${message.value()}")
} catch (e: Exception) {
handleError(message, e)
logger.warn("CONSUMER GOT an ERROR on the application side, exception: $e")
val record = ProducerRecord<K, V>(
topicDefinition.deadLetterTopic,
message.partition(),
message.key(),
message.value(),
message.headers()
)
try {
publisher.publishScope { offer(record) }
} catch (e: Exception) {
logger.error("Failed to publish message to dead letter topic: $message", e)
}
}
}

override fun close(): Unit = runBlocking {
try {
scope.cancel()
} catch (e: Exception) {
logger.error("Failed to stop consuming", e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.github.nomisRev.kafka.e2e.setup.example

object DomainEvents {
data class ProductCreated(val productId: String)
}
Loading

0 comments on commit e858cca

Please sign in to comment.