Skip to content

Commit

Permalink
Redis PubSub (#1151)
Browse files Browse the repository at this point in the history
* Redis PubSub

* Redis PubSub

* Redis PubSub - review comments incorporated

* Redis PubSub - review comments incorporated

* Redis PubSub - review comments incorporated

* Redis PubSub - review comments incorporated

* Redis PubSub - review comments incorporated
  • Loading branch information
abh1navv authored Dec 27, 2024
1 parent 34a1061 commit dfb6940
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 0 deletions.
13 changes: 13 additions & 0 deletions kotlin-libraries-data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@
<version>${moshi.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.codemonstur</groupId>
<artifactId>embedded-redis</artifactId>
<version>${embedded-redis.version}</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>${lettuce-core.version}</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -161,6 +171,9 @@
<retrofit.version>2.9.0</retrofit.version>
<wiremock-jre8.version>2.35.1</wiremock-jre8.version>
<moshi.version>1.15.1</moshi.version>
<kotlinx-coroutines.version>1.7.1</kotlinx-coroutines.version>
<embedded-redis.version>1.4.3</embedded-redis.version>
<lettuce-core.version>6.5.0.RELEASE</lettuce-core.version>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.baeldung.redispubsub

data class Message(val content: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.baeldung.redispubsub

import io.lettuce.core.pubsub.RedisPubSubAdapter
import java.util.concurrent.CountDownLatch

class MessageListener : RedisPubSubAdapter<String, String>() {

var latch: CountDownLatch = CountDownLatch(1)

var messagesReceived: List<String> = emptyList()
override fun message(channel: String?, message: String?) {
println("Received message: $message from channel: $channel")
messagesReceived = messagesReceived.plus(message!!)
latch.countDown()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.baeldung.redispubsub

import io.lettuce.core.RedisClient
import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.sync.RedisCommands
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands

object RedisConnectionManager: AutoCloseable {
private val redisClient: RedisClient = RedisClient.create("redis://localhost:6379")
private val connection: StatefulRedisConnection<String, String> = redisClient.connect()

override fun close() {
connection.close()
redisClient.shutdown()
}

fun redisSyncCommands(): RedisCommands<String, String>? {
return connection.sync()
}

fun redisPubSubAsyncCommands(messageListener: MessageListener): RedisPubSubAsyncCommands<String, String> {
val pubSubConnection = redisClient.connectPubSub()
pubSubConnection.addListener(messageListener)
return pubSubConnection.async()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.baeldung.redispubsub

class RedisPublisher {

fun publishMessage(channel: String, message: Message) {
RedisConnectionManager.redisSyncCommands()?.publish(channel, message.content)
println("Message published: $message")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.baeldung.redispubsub

class RedisSubscriber(private val messageListener: MessageListener) {

fun subscribeToChannel(channel: String) {
RedisConnectionManager.redisPubSubAsyncCommands(messageListener).subscribe(channel)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.baeldung.redispubsub

import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import redis.embedded.RedisServer
import java.util.concurrent.TimeUnit

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class RedisSubscriberUnitTest {

val messageListener = MessageListener()
val redisSubscriber = RedisSubscriber(messageListener)
val redisPublisher = RedisPublisher()
val channel = "channel"
val message = Message("Hello, Redis!")

val redisServer = RedisServer(6379)

@BeforeAll
fun setUp() {
redisServer.start()
}

@AfterAll
fun tearDown() {
RedisConnectionManager.close()
redisServer.stop()
}

@Test
fun givenMessageListener_whenMessagePublished_thenMessageReceived() {
redisSubscriber.subscribeToChannel(channel)
redisPublisher.publishMessage(channel, message)
messageListener.latch.await(500, TimeUnit.MILLISECONDS)
assertEquals(message.content, messageListener.messagesReceived.get(0))
}

}

0 comments on commit dfb6940

Please sign in to comment.