Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement configurable reconnect duration #28

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions src/main/scala/redis/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import redis.actors.{RedisSubscriberActorWithCallback, RedisClientActor}
import redis.api.pubsub._
import java.util.concurrent.atomic.AtomicLong
import akka.event.Logging
import scala.concurrent.duration.{FiniteDuration, DurationInt}

trait RedisCommands
extends Keys
Expand All @@ -28,11 +29,14 @@ abstract class RedisClientActorLike(system: ActorSystem) extends ActorRequest {
val name: String
val password: Option[String] = None
val db: Option[Int] = None
val reconnectDuration: FiniteDuration
implicit val executionContext = system.dispatcher

val redisConnection: ActorRef = system.actorOf(
Props(classOf[RedisClientActor], new InetSocketAddress(host, port), getConnectOperations)
.withDispatcher(Redis.dispatcher),
Props(classOf[RedisClientActor],
new InetSocketAddress(host, port),
reconnectDuration,
getConnectOperations).withDispatcher(Redis.dispatcher),
name + '-' + Redis.tempName()
)

Expand Down Expand Up @@ -70,6 +74,7 @@ case class RedisClient(var host: String = "localhost",
var port: Int = 6379,
override val password: Option[String] = None,
override val db: Option[Int] = None,
override val reconnectDuration: FiniteDuration = 2 seconds,
name: String = "RedisClient")
(implicit _system: ActorSystem) extends RedisClientActorLike(_system) with RedisCommands with Transactions {

Expand All @@ -79,13 +84,15 @@ case class RedisBlockingClient(var host: String = "localhost",
var port: Int = 6379,
override val password: Option[String] = None,
override val db: Option[Int] = None,
override val reconnectDuration: FiniteDuration = 2 seconds,
name: String = "RedisBlockingClient")
(implicit _system: ActorSystem) extends RedisClientActorLike(_system) with BLists {
}

case class RedisPubSub(
host: String = "localhost",
port: Int = 6379,
reconnectDuration: FiniteDuration = 2 seconds,
channels: Seq[String],
patterns: Seq[String],
onMessage: Message => Unit = _ => {},
Expand All @@ -96,7 +103,7 @@ case class RedisPubSub(

val redisConnection: ActorRef = system.actorOf(
Props(classOf[RedisSubscriberActorWithCallback],
new InetSocketAddress(host, port), channels, patterns, onMessage, onPMessage, authPassword)
new InetSocketAddress(host, port), channels, patterns, onMessage, onPMessage, reconnectDuration, authPassword)
.withDispatcher(Redis.dispatcher),
name + '-' + Redis.tempName()
)
Expand Down Expand Up @@ -130,6 +137,7 @@ trait SentinelCommands

case class SentinelClient(var host: String = "localhost",
var port: Int = 26379,
val reconnectDuration: FiniteDuration = 2 seconds,
onMasterChange: (String, String, Int) => Unit = (masterName: String, ip: String, port: Int) => {},
onNewSentinel: (String, String, Int) => Unit = (masterName: String, sentinelip: String, sentinelport: Int) => {},
onSentinelDown: (String, String, Int) => Unit = (masterName: String, sentinelip: String, sentinelport: Int) => {},
Expand Down Expand Up @@ -182,7 +190,7 @@ case class SentinelClient(var host: String = "localhost",

val redisPubSubConnection: ActorRef = system.actorOf(
Props(classOf[RedisSubscriberActorWithCallback],
new InetSocketAddress(host, port), channels, Seq(), onMessage, (pmessage: PMessage) => {}, None)
new InetSocketAddress(host, port), channels, Seq(), onMessage, (pmessage: PMessage) => {}, reconnectDuration, None)
.withDispatcher(Redis.dispatcher),
name + '-' + Redis.tempName()
)
Expand All @@ -201,20 +209,21 @@ abstract class SentinelMonitored(system: ActorSystem) {
val sentinels: Seq[(String, Int)]
val master: String
val onMasterChange: (String, Int) => Unit
val reconnectDuration: FiniteDuration

implicit val executionContext = system.dispatcher

val sentinelClients =
collection.mutable.Map(
sentinels.map(hp =>
(makeSentinelClientKey(hp._1, hp._2), makeSentinelClient(hp._1, hp._2))
(makeSentinelClientKey(hp._1, hp._2), makeSentinelClient(hp._1, hp._2, reconnectDuration))
):_*
)

def makeSentinelClientKey(host: String, port: Int) = s"$host:$port"

def makeSentinelClient(host: String, port: Int): SentinelClient = {
new SentinelClient(host, port, onSwitchMaster, onNewSentinel, onSentinelDown, "SMSentinelClient")(system)
def makeSentinelClient(host: String, port: Int, reconnectDuration: FiniteDuration): SentinelClient = {
new SentinelClient(host, port, reconnectDuration, onSwitchMaster, onNewSentinel, onSentinelDown, "SMSentinelClient")(system)
}


Expand All @@ -228,7 +237,7 @@ abstract class SentinelMonitored(system: ActorSystem) {
if (master == masterName && !sentinelClients.contains(k)) {
sentinelClients.synchronized {
if (!sentinelClients.contains(k))
sentinelClients += k -> makeSentinelClient(sentinelip, sentinelport)
sentinelClients += k -> makeSentinelClient(sentinelip, sentinelport, reconnectDuration)
}
}
}
Expand Down Expand Up @@ -276,7 +285,8 @@ abstract class SentinelMonitoredRedisClientLike(system: ActorSystem) extends Sen
}

case class SentinelMonitoredRedisClient( sentinels: Seq[(String, Int)] = Seq(("localhost", 26379)),
master: String)
master: String,
reconnectDuration: FiniteDuration = 2 seconds)
(implicit system: ActorSystem) extends SentinelMonitoredRedisClientLike(system) with RedisCommands with Transactions {

val redisClient: RedisClient = withMasterAddr((ip, port) => {
Expand All @@ -287,7 +297,8 @@ case class SentinelMonitoredRedisClient( sentinels: Seq[(String, Int)] = Seq(("l


case class SentinelMonitoredRedisBlockingClient( sentinels: Seq[(String, Int)] = Seq(("localhost", 26379)),
master: String)
master: String,
reconnectDuration: FiniteDuration = 2 seconds)
(implicit system: ActorSystem) extends SentinelMonitoredRedisClientLike(system) with BLists {
val redisClient: RedisBlockingClient = withMasterAddr((ip, port) => {
new RedisBlockingClient(ip, port, name = "SMRedisBlockingClient")
Expand Down
12 changes: 8 additions & 4 deletions src/main/scala/redis/RedisPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.net.InetSocketAddress
import scala.concurrent.{Future, ExecutionContext}
import redis.protocol.RedisReply
import redis.commands.Transactions

import scala.concurrent.duration.{DurationInt, FiniteDuration}

case class RedisServer(host: String = "localhost",
port: Int = 6379,
Expand All @@ -16,12 +16,15 @@ case class RedisServer(host: String = "localhost",
abstract class RedisClientPoolLike(system: ActorSystem) extends RoundRobinPoolRequest {
val redisServers: Seq[RedisServer]
val name: String
val reconnectDuration: FiniteDuration
implicit val executionContext = system.dispatcher

val redisConnectionPool: Seq[ActorRef] = redisServers.map(server => {
system.actorOf(
Props(classOf[RedisClientActor], new InetSocketAddress(server.host, server.port), getConnectOperations(server))
.withDispatcher(Redis.dispatcher),
Props(classOf[RedisClientActor],
new InetSocketAddress(server.host, server.port),
reconnectDuration,
getConnectOperations(server)).withDispatcher(Redis.dispatcher),
name + '-' + Redis.tempName()
)
})
Expand Down Expand Up @@ -51,7 +54,8 @@ abstract class RedisClientPoolLike(system: ActorSystem) extends RoundRobinPoolRe
}

case class RedisClientPool(redisServers: Seq[RedisServer],
name: String = "RedisClientPool")
name: String = "RedisClientPool",
val reconnectDuration: FiniteDuration = 2 seconds)
(implicit _system: ActorSystem) extends RedisClientPoolLike(_system) with RedisCommands

case class RedisClientMasterSlaves(master: RedisServer,
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/redis/actors/RedisClientActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import redis.{Redis, Operation, Transaction}
import akka.actor._
import scala.collection.mutable
import akka.actor.SupervisorStrategy.Stop
import scala.concurrent.duration.FiniteDuration

class RedisClientActor(override val address: InetSocketAddress, getConnectOperations: () => Seq[Operation[_, _]]) extends RedisWorkerIO(address) {
class RedisClientActor(override val address: InetSocketAddress,
reconnectDuration: FiniteDuration,
getConnectOperations: () => Seq[Operation[_, _]]) extends RedisWorkerIO(address, reconnectDuration) {


import context._
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/redis/actors/RedisSubscriberActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import redis.protocol.{MultiBulk, RedisReply}
import redis.api.pubsub._
import java.net.InetSocketAddress
import redis.api.connection.Auth
import scala.concurrent.duration.FiniteDuration

class RedisSubscriberActorWithCallback(
address: InetSocketAddress,
channels: Seq[String],
patterns: Seq[String],
messageCallback: Message => Unit,
pmessageCallback: PMessage => Unit,
reconnectDuration: FiniteDuration,
authPassword: Option[String] = None
) extends RedisSubscriberActor(address, channels, patterns, authPassword) {
) extends RedisSubscriberActor(address, channels, patterns, reconnectDuration, authPassword) {
def onMessage(m: Message) = messageCallback(m)

def onPMessage(pm: PMessage) = pmessageCallback(pm)
Expand All @@ -23,8 +25,9 @@ abstract class RedisSubscriberActor(
address: InetSocketAddress,
channels: Seq[String],
patterns: Seq[String],
reconnectDuration: FiniteDuration,
authPassword: Option[String] = None
) extends RedisWorkerIO(address) with DecodeReplies {
) extends RedisWorkerIO(address, reconnectDuration) with DecodeReplies {
def onConnectWrite(): ByteString = {
authPassword.map(Auth(_).encodedRequest).getOrElse(ByteString.empty)
}
Expand Down
8 changes: 3 additions & 5 deletions src/main/scala/redis/actors/RedisWorkerIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import akka.io.Tcp.Register
import akka.io.Tcp.Connect
import akka.io.Tcp.CommandFailed
import akka.io.Tcp.Received
import scala.concurrent.duration.FiniteDuration

abstract class RedisWorkerIO(val address: InetSocketAddress) extends Actor with ActorLogging {
abstract class RedisWorkerIO(val address: InetSocketAddress,
reconnectDuration: FiniteDuration) extends Actor with ActorLogging {

private var currAddress = address

Expand Down Expand Up @@ -168,10 +170,6 @@ abstract class RedisWorkerIO(val address: InetSocketAddress) extends Actor with
}
}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

def reconnectDuration: FiniteDuration = 2 seconds

private def writeWorker(byteString: ByteString) {
onWriteSent()
tcpWorker ! Write(byteString, WriteAck)
Expand Down
6 changes: 4 additions & 2 deletions src/test/scala/redis/RedisPubSubSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import redis.actors.RedisSubscriberActor
import java.net.InetSocketAddress
import akka.actor.{Props, ActorRef}
import akka.testkit.{TestActorRef, TestProbe}
import scala.concurrent.duration.{DurationInt, FiniteDuration}

class RedisPubSubSpec extends RedisSpec {

Expand Down Expand Up @@ -47,7 +48,7 @@ class RedisPubSubSpec extends RedisSpec {
val patterns = Seq("pattern.*")

val subscriberActor = TestActorRef[SubscriberActor](
Props(classOf[SubscriberActor], new InetSocketAddress("localhost", 6379),
Props(classOf[SubscriberActor], new InetSocketAddress("localhost", 6379), 2 seconds,
channels, patterns, probeMock.ref)
.withDispatcher(Redis.dispatcher),
"SubscriberActor"
Expand Down Expand Up @@ -109,10 +110,11 @@ class RedisPubSubSpec extends RedisSpec {
}

class SubscriberActor(address: InetSocketAddress,
reconnectDuration: FiniteDuration,
channels: Seq[String],
patterns: Seq[String],
probeMock: ActorRef
) extends RedisSubscriberActor(address, channels, patterns) {
) extends RedisSubscriberActor(address, channels, patterns, reconnectDuration) {

override def onMessage(m: Message) = {
probeMock ! m
Expand Down
10 changes: 6 additions & 4 deletions src/test/scala/redis/actors/RedisClientActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import java.net.InetSocketAddress
import akka.util.ByteString
import scala.concurrent.{Await, Promise}
import scala.collection.mutable
import redis.{RedisCommand, Redis, Operation}
import redis.{Redis, Operation}
import redis.api.connection.Ping
import redis.api.strings.Get
import redis.protocol.Bulk
import scala.concurrent.duration.DurationInt

class RedisClientActorSpec extends TestKit(ActorSystem()) with SpecificationLike with Tags with NoTimeConversions with ImplicitSender {

Expand Down Expand Up @@ -127,8 +127,10 @@ class RedisClientActorSpec extends TestKit(ActorSystem()) with SpecificationLike
}
}

class RedisClientActorMock(probeReplyDecoder: ActorRef, probeMock: ActorRef, getConnectOperations: () => Seq[Operation[_, _]])
extends RedisClientActor(new InetSocketAddress("localhost", 6379), getConnectOperations) {
class RedisClientActorMock(probeReplyDecoder: ActorRef,
probeMock: ActorRef,
getConnectOperations: () => Seq[Operation[_, _]])
extends RedisClientActor(new InetSocketAddress("localhost", 6379), 2 seconds, getConnectOperations) {
override def initRepliesDecoder() = probeReplyDecoder

override def preStart() {
Expand Down
3 changes: 2 additions & 1 deletion src/test/scala/redis/actors/RedisReplyDecoderSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import java.net.InetSocketAddress
import com.typesafe.config.ConfigFactory
import redis.{Redis, Operation}
import redis.api.connection.Ping
import scala.concurrent.duration.DurationInt

class RedisReplyDecoderSpec
extends TestKit(ActorSystem("testsystem", ConfigFactory.parseString( """akka.loggers = ["akka.testkit.TestEventListener"]""")))
Expand Down Expand Up @@ -173,7 +174,7 @@ class RedisReplyDecoderSpec
}

class RedisClientActorMock2(probeMock: ActorRef)
extends RedisClientActor(new InetSocketAddress("localhost", 6379), () => {Seq()}) {
extends RedisClientActor(new InetSocketAddress("localhost", 6379), 2 seconds, () => {Seq()}) {
override def preStart() {
// disable preStart of RedisWorkerIO
}
Expand Down
8 changes: 5 additions & 3 deletions src/test/scala/redis/actors/RedisSubscriberActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import redis.Redis
import akka.io.Tcp._
import redis.api.pubsub.Message
import redis.api.pubsub.PMessage
import scala.concurrent.duration.{DurationInt, FiniteDuration}

class RedisSubscriberActorSpec extends TestKit(ActorSystem()) with SpecificationLike with Tags with NoTimeConversions with ImplicitSender {

Expand All @@ -23,7 +24,7 @@ class RedisSubscriberActorSpec extends TestKit(ActorSystem()) with Specification
val patterns = Seq("pattern.*")

val subscriberActor = TestActorRef[SubscriberActor](Props(classOf[SubscriberActor],
new InetSocketAddress("localhost", 6379), channels, patterns, probeMock.ref)
new InetSocketAddress("localhost", 6379), channels, patterns, probeMock.ref, 2 seconds)
.withDispatcher(Redis.dispatcher))

val connectMsg = probeMock.expectMsgType[Connect]
Expand Down Expand Up @@ -62,8 +63,9 @@ class RedisSubscriberActorSpec extends TestKit(ActorSystem()) with Specification
class SubscriberActor(address: InetSocketAddress,
channels: Seq[String],
patterns: Seq[String],
probeMock: ActorRef
) extends RedisSubscriberActor(address, channels, patterns) {
probeMock: ActorRef,
reconnectDuration: FiniteDuration
) extends RedisSubscriberActor(address, channels, patterns, reconnectDuration) {

override val tcp = probeMock

Expand Down
Loading