Skip to content

Commit

Permalink
Merge pull request #187 from nomisRev/stressTest
Browse files Browse the repository at this point in the history
Stress testing
  • Loading branch information
nomisRev authored Apr 7, 2024
2 parents 3b90b60 + fceef81 commit ccafffa
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
check:
runs-on: ubuntu-latest
timeout-minutes: 30
timeout-minutes: 120

steps:
- uses: actions/checkout@v4
Expand All @@ -23,7 +23,7 @@ jobs:

- uses: gradle/gradle-build-action@v2
with:
arguments: build --scan --full-stacktrace
arguments: build --scan --full-stacktrace -PstressTest=100

- name: Bundle the build report
if: failure()
Expand Down
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ tasks {

withType<Test>().configureEach {
useJUnitPlatform()
maxParallelForks = (2 * Runtime.getRuntime().availableProcessors())
if (project.hasProperty("stressTest")) {
systemProperty("io.github.nomisrev.kafka.TEST_ITERATIONS", project.properties["stressTest"] ?: 100)
}
testLogging {
exceptionFormat = FULL
events = setOf(SKIPPED, FAILED, STANDARD_ERROR)
Expand Down
29 changes: 17 additions & 12 deletions src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.seconds

private val testIterations: Int =
System.getProperties().getProperty("io.github.nomisrev.kafka.TEST_ITERATIONS")?.toIntOrNull() ?: 1

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
abstract class KafkaSpec {
Expand All @@ -63,7 +65,7 @@ abstract class KafkaSpec {
fun destroy() {
kafka.stop()
}

@BeforeAll
@JvmStatic
fun setup() {
Expand All @@ -86,10 +88,10 @@ abstract class KafkaSpec {
withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "true")
withReuse(true)
}

fun KafkaReceiver(): KafkaReceiver<String, String> =
KafkaReceiver(receiverSetting())

fun receiverSetting(): ReceiverSettings<String, String> =
ReceiverSettings(
bootstrapServers = kafka.bootstrapServers,
Expand Down Expand Up @@ -169,15 +171,17 @@ abstract class KafkaSpec {
partitions: Int = 4,
replicationFactor: Short = 1,
test: suspend TopicTestScope.(NewTopic) -> Unit
): Unit = runTest {
val topic = NewTopic(nextTopicName(), partitions, replicationFactor).configs(topicConfig)
admin {
createTopic(topic)
try {
TopicTestScope(topic, this@runTest).test(topic)
} finally {
topic.shouldBeEmpty()
deleteTopic(topic.name())
): Unit = repeat(testIterations) {
runTest {
val topic = NewTopic(nextTopicName(), partitions, replicationFactor).configs(topicConfig)
admin {
createTopic(topic)
try {
TopicTestScope(topic, this@runTest).test(topic)
} finally {
topic.shouldBeEmpty()
deleteTopic(topic.name())
}
}
}
}
Expand Down Expand Up @@ -298,6 +302,7 @@ abstract class KafkaSpec {
object : Producer<String, String> {
override fun clientInstanceId(p0: Duration?): Uuid =
producer.clientInstanceId(p0)

override fun close() {}

override fun close(timeout: Duration?) {}
Expand Down

0 comments on commit ccafffa

Please sign in to comment.