From ceec9c30c483670d48aec3012cc73e6d2b23ee3f Mon Sep 17 00:00:00 2001 From: Alexander Myltsev Date: Mon, 3 Feb 2014 22:05:53 +0400 Subject: [PATCH] Implement configurable reconnect duration --- src/main/scala/redis/Redis.scala | 31 +++++++++++++------ src/main/scala/redis/RedisPool.scala | 12 ++++--- .../scala/redis/actors/RedisClientActor.scala | 5 ++- .../redis/actors/RedisSubscriberActor.scala | 7 +++-- .../scala/redis/actors/RedisWorkerIO.scala | 8 ++--- src/test/scala/redis/RedisPubSubSpec.scala | 6 ++-- .../redis/actors/RedisClientActorSpec.scala | 10 +++--- .../redis/actors/RedisReplyDecoderSpec.scala | 3 +- .../actors/RedisSubscriberActorSpec.scala | 8 +++-- .../redis/actors/RedisWorkerIOSpec.scala | 21 ++++++++----- 10 files changed, 71 insertions(+), 40 deletions(-) diff --git a/src/main/scala/redis/Redis.scala b/src/main/scala/redis/Redis.scala index 512a2ed4..eda39d2a 100644 --- a/src/main/scala/redis/Redis.scala +++ b/src/main/scala/redis/Redis.scala @@ -9,6 +9,7 @@ import redis.actors.{RedisSubscriberActorWithCallback, RedisClientActor} import redis.api.pubsub._ import java.util.concurrent.atomic.AtomicLong import akka.event.Logging +import scala.concurrent.duration.{FiniteDuration, DurationInt} trait RedisCommands extends Keys @@ -28,11 +29,14 @@ abstract class RedisClientActorLike(system: ActorSystem) extends ActorRequest { val name: String val password: Option[String] = None val db: Option[Int] = None + val reconnectDuration: FiniteDuration implicit val executionContext = system.dispatcher val redisConnection: ActorRef = system.actorOf( - Props(classOf[RedisClientActor], new InetSocketAddress(host, port), getConnectOperations) - .withDispatcher(Redis.dispatcher), + Props(classOf[RedisClientActor], + new InetSocketAddress(host, port), + reconnectDuration, + getConnectOperations).withDispatcher(Redis.dispatcher), name + '-' + Redis.tempName() ) @@ -70,6 +74,7 @@ case class RedisClient(var host: String = "localhost", var port: Int = 6379, override val password: Option[String] = None, override val db: Option[Int] = None, + override val reconnectDuration: FiniteDuration = 2 seconds, name: String = "RedisClient") (implicit _system: ActorSystem) extends RedisClientActorLike(_system) with RedisCommands with Transactions { @@ -79,6 +84,7 @@ case class RedisBlockingClient(var host: String = "localhost", var port: Int = 6379, override val password: Option[String] = None, override val db: Option[Int] = None, + override val reconnectDuration: FiniteDuration = 2 seconds, name: String = "RedisBlockingClient") (implicit _system: ActorSystem) extends RedisClientActorLike(_system) with BLists { } @@ -86,6 +92,7 @@ case class RedisBlockingClient(var host: String = "localhost", case class RedisPubSub( host: String = "localhost", port: Int = 6379, + reconnectDuration: FiniteDuration = 2 seconds, channels: Seq[String], patterns: Seq[String], onMessage: Message => Unit = _ => {}, @@ -96,7 +103,7 @@ case class RedisPubSub( val redisConnection: ActorRef = system.actorOf( Props(classOf[RedisSubscriberActorWithCallback], - new InetSocketAddress(host, port), channels, patterns, onMessage, onPMessage, authPassword) + new InetSocketAddress(host, port), channels, patterns, onMessage, onPMessage, reconnectDuration, authPassword) .withDispatcher(Redis.dispatcher), name + '-' + Redis.tempName() ) @@ -130,6 +137,7 @@ trait SentinelCommands case class SentinelClient(var host: String = "localhost", var port: Int = 26379, + val reconnectDuration: FiniteDuration = 2 seconds, onMasterChange: (String, String, Int) => Unit = (masterName: String, ip: String, port: Int) => {}, onNewSentinel: (String, String, Int) => Unit = (masterName: String, sentinelip: String, sentinelport: Int) => {}, onSentinelDown: (String, String, Int) => Unit = (masterName: String, sentinelip: String, sentinelport: Int) => {}, @@ -182,7 +190,7 @@ case class SentinelClient(var host: String = "localhost", val redisPubSubConnection: ActorRef = system.actorOf( Props(classOf[RedisSubscriberActorWithCallback], - new InetSocketAddress(host, port), channels, Seq(), onMessage, (pmessage: PMessage) => {}, None) + new InetSocketAddress(host, port), channels, Seq(), onMessage, (pmessage: PMessage) => {}, reconnectDuration, None) .withDispatcher(Redis.dispatcher), name + '-' + Redis.tempName() ) @@ -201,20 +209,21 @@ abstract class SentinelMonitored(system: ActorSystem) { val sentinels: Seq[(String, Int)] val master: String val onMasterChange: (String, Int) => Unit + val reconnectDuration: FiniteDuration implicit val executionContext = system.dispatcher val sentinelClients = collection.mutable.Map( sentinels.map(hp => - (makeSentinelClientKey(hp._1, hp._2), makeSentinelClient(hp._1, hp._2)) + (makeSentinelClientKey(hp._1, hp._2), makeSentinelClient(hp._1, hp._2, reconnectDuration)) ):_* ) def makeSentinelClientKey(host: String, port: Int) = s"$host:$port" - def makeSentinelClient(host: String, port: Int): SentinelClient = { - new SentinelClient(host, port, onSwitchMaster, onNewSentinel, onSentinelDown, "SMSentinelClient")(system) + def makeSentinelClient(host: String, port: Int, reconnectDuration: FiniteDuration): SentinelClient = { + new SentinelClient(host, port, reconnectDuration, onSwitchMaster, onNewSentinel, onSentinelDown, "SMSentinelClient")(system) } @@ -228,7 +237,7 @@ abstract class SentinelMonitored(system: ActorSystem) { if (master == masterName && !sentinelClients.contains(k)) { sentinelClients.synchronized { if (!sentinelClients.contains(k)) - sentinelClients += k -> makeSentinelClient(sentinelip, sentinelport) + sentinelClients += k -> makeSentinelClient(sentinelip, sentinelport, reconnectDuration) } } } @@ -276,7 +285,8 @@ abstract class SentinelMonitoredRedisClientLike(system: ActorSystem) extends Sen } case class SentinelMonitoredRedisClient( sentinels: Seq[(String, Int)] = Seq(("localhost", 26379)), - master: String) + master: String, + reconnectDuration: FiniteDuration = 2 seconds) (implicit system: ActorSystem) extends SentinelMonitoredRedisClientLike(system) with RedisCommands with Transactions { val redisClient: RedisClient = withMasterAddr((ip, port) => { @@ -287,7 +297,8 @@ case class SentinelMonitoredRedisClient( sentinels: Seq[(String, Int)] = Seq(("l case class SentinelMonitoredRedisBlockingClient( sentinels: Seq[(String, Int)] = Seq(("localhost", 26379)), - master: String) + master: String, + reconnectDuration: FiniteDuration = 2 seconds) (implicit system: ActorSystem) extends SentinelMonitoredRedisClientLike(system) with BLists { val redisClient: RedisBlockingClient = withMasterAddr((ip, port) => { new RedisBlockingClient(ip, port, name = "SMRedisBlockingClient") diff --git a/src/main/scala/redis/RedisPool.scala b/src/main/scala/redis/RedisPool.scala index c45cbd2e..303e4be4 100644 --- a/src/main/scala/redis/RedisPool.scala +++ b/src/main/scala/redis/RedisPool.scala @@ -6,7 +6,7 @@ import java.net.InetSocketAddress import scala.concurrent.{Future, ExecutionContext} import redis.protocol.RedisReply import redis.commands.Transactions - +import scala.concurrent.duration.{DurationInt, FiniteDuration} case class RedisServer(host: String = "localhost", port: Int = 6379, @@ -16,12 +16,15 @@ case class RedisServer(host: String = "localhost", abstract class RedisClientPoolLike(system: ActorSystem) extends RoundRobinPoolRequest { val redisServers: Seq[RedisServer] val name: String + val reconnectDuration: FiniteDuration implicit val executionContext = system.dispatcher val redisConnectionPool: Seq[ActorRef] = redisServers.map(server => { system.actorOf( - Props(classOf[RedisClientActor], new InetSocketAddress(server.host, server.port), getConnectOperations(server)) - .withDispatcher(Redis.dispatcher), + Props(classOf[RedisClientActor], + new InetSocketAddress(server.host, server.port), + reconnectDuration, + getConnectOperations(server)).withDispatcher(Redis.dispatcher), name + '-' + Redis.tempName() ) }) @@ -51,7 +54,8 @@ abstract class RedisClientPoolLike(system: ActorSystem) extends RoundRobinPoolRe } case class RedisClientPool(redisServers: Seq[RedisServer], - name: String = "RedisClientPool") + name: String = "RedisClientPool", + val reconnectDuration: FiniteDuration = 2 seconds) (implicit _system: ActorSystem) extends RedisClientPoolLike(_system) with RedisCommands case class RedisClientMasterSlaves(master: RedisServer, diff --git a/src/main/scala/redis/actors/RedisClientActor.scala b/src/main/scala/redis/actors/RedisClientActor.scala index ca627b3a..4cf5b617 100644 --- a/src/main/scala/redis/actors/RedisClientActor.scala +++ b/src/main/scala/redis/actors/RedisClientActor.scala @@ -6,8 +6,11 @@ import redis.{Redis, Operation, Transaction} import akka.actor._ import scala.collection.mutable import akka.actor.SupervisorStrategy.Stop +import scala.concurrent.duration.FiniteDuration -class RedisClientActor(override val address: InetSocketAddress, getConnectOperations: () => Seq[Operation[_, _]]) extends RedisWorkerIO(address) { +class RedisClientActor(override val address: InetSocketAddress, + reconnectDuration: FiniteDuration, + getConnectOperations: () => Seq[Operation[_, _]]) extends RedisWorkerIO(address, reconnectDuration) { import context._ diff --git a/src/main/scala/redis/actors/RedisSubscriberActor.scala b/src/main/scala/redis/actors/RedisSubscriberActor.scala index 11f36fb7..3c726f3b 100644 --- a/src/main/scala/redis/actors/RedisSubscriberActor.scala +++ b/src/main/scala/redis/actors/RedisSubscriberActor.scala @@ -5,6 +5,7 @@ import redis.protocol.{MultiBulk, RedisReply} import redis.api.pubsub._ import java.net.InetSocketAddress import redis.api.connection.Auth +import scala.concurrent.duration.FiniteDuration class RedisSubscriberActorWithCallback( address: InetSocketAddress, @@ -12,8 +13,9 @@ class RedisSubscriberActorWithCallback( patterns: Seq[String], messageCallback: Message => Unit, pmessageCallback: PMessage => Unit, + reconnectDuration: FiniteDuration, authPassword: Option[String] = None - ) extends RedisSubscriberActor(address, channels, patterns, authPassword) { + ) extends RedisSubscriberActor(address, channels, patterns, reconnectDuration, authPassword) { def onMessage(m: Message) = messageCallback(m) def onPMessage(pm: PMessage) = pmessageCallback(pm) @@ -23,8 +25,9 @@ abstract class RedisSubscriberActor( address: InetSocketAddress, channels: Seq[String], patterns: Seq[String], + reconnectDuration: FiniteDuration, authPassword: Option[String] = None - ) extends RedisWorkerIO(address) with DecodeReplies { + ) extends RedisWorkerIO(address, reconnectDuration) with DecodeReplies { def onConnectWrite(): ByteString = { authPassword.map(Auth(_).encodedRequest).getOrElse(ByteString.empty) } diff --git a/src/main/scala/redis/actors/RedisWorkerIO.scala b/src/main/scala/redis/actors/RedisWorkerIO.scala index b9334544..56e804fb 100644 --- a/src/main/scala/redis/actors/RedisWorkerIO.scala +++ b/src/main/scala/redis/actors/RedisWorkerIO.scala @@ -10,8 +10,10 @@ import akka.io.Tcp.Register import akka.io.Tcp.Connect import akka.io.Tcp.CommandFailed import akka.io.Tcp.Received +import scala.concurrent.duration.FiniteDuration -abstract class RedisWorkerIO(val address: InetSocketAddress) extends Actor with ActorLogging { +abstract class RedisWorkerIO(val address: InetSocketAddress, + reconnectDuration: FiniteDuration) extends Actor with ActorLogging { private var currAddress = address @@ -168,10 +170,6 @@ abstract class RedisWorkerIO(val address: InetSocketAddress) extends Actor with } } - import scala.concurrent.duration.{DurationInt, FiniteDuration} - - def reconnectDuration: FiniteDuration = 2 seconds - private def writeWorker(byteString: ByteString) { onWriteSent() tcpWorker ! Write(byteString, WriteAck) diff --git a/src/test/scala/redis/RedisPubSubSpec.scala b/src/test/scala/redis/RedisPubSubSpec.scala index 825bb9fe..5f61a471 100644 --- a/src/test/scala/redis/RedisPubSubSpec.scala +++ b/src/test/scala/redis/RedisPubSubSpec.scala @@ -6,6 +6,7 @@ import redis.actors.RedisSubscriberActor import java.net.InetSocketAddress import akka.actor.{Props, ActorRef} import akka.testkit.{TestActorRef, TestProbe} +import scala.concurrent.duration.{DurationInt, FiniteDuration} class RedisPubSubSpec extends RedisSpec { @@ -47,7 +48,7 @@ class RedisPubSubSpec extends RedisSpec { val patterns = Seq("pattern.*") val subscriberActor = TestActorRef[SubscriberActor]( - Props(classOf[SubscriberActor], new InetSocketAddress("localhost", 6379), + Props(classOf[SubscriberActor], new InetSocketAddress("localhost", 6379), 2 seconds, channels, patterns, probeMock.ref) .withDispatcher(Redis.dispatcher), "SubscriberActor" @@ -109,10 +110,11 @@ class RedisPubSubSpec extends RedisSpec { } class SubscriberActor(address: InetSocketAddress, + reconnectDuration: FiniteDuration, channels: Seq[String], patterns: Seq[String], probeMock: ActorRef - ) extends RedisSubscriberActor(address, channels, patterns) { + ) extends RedisSubscriberActor(address, channels, patterns, reconnectDuration) { override def onMessage(m: Message) = { probeMock ! m diff --git a/src/test/scala/redis/actors/RedisClientActorSpec.scala b/src/test/scala/redis/actors/RedisClientActorSpec.scala index 13ee470c..836a84a4 100644 --- a/src/test/scala/redis/actors/RedisClientActorSpec.scala +++ b/src/test/scala/redis/actors/RedisClientActorSpec.scala @@ -8,10 +8,10 @@ import java.net.InetSocketAddress import akka.util.ByteString import scala.concurrent.{Await, Promise} import scala.collection.mutable -import redis.{RedisCommand, Redis, Operation} +import redis.{Redis, Operation} import redis.api.connection.Ping import redis.api.strings.Get -import redis.protocol.Bulk +import scala.concurrent.duration.DurationInt class RedisClientActorSpec extends TestKit(ActorSystem()) with SpecificationLike with Tags with NoTimeConversions with ImplicitSender { @@ -127,8 +127,10 @@ class RedisClientActorSpec extends TestKit(ActorSystem()) with SpecificationLike } } -class RedisClientActorMock(probeReplyDecoder: ActorRef, probeMock: ActorRef, getConnectOperations: () => Seq[Operation[_, _]]) - extends RedisClientActor(new InetSocketAddress("localhost", 6379), getConnectOperations) { +class RedisClientActorMock(probeReplyDecoder: ActorRef, + probeMock: ActorRef, + getConnectOperations: () => Seq[Operation[_, _]]) + extends RedisClientActor(new InetSocketAddress("localhost", 6379), 2 seconds, getConnectOperations) { override def initRepliesDecoder() = probeReplyDecoder override def preStart() { diff --git a/src/test/scala/redis/actors/RedisReplyDecoderSpec.scala b/src/test/scala/redis/actors/RedisReplyDecoderSpec.scala index c5ed06d8..d06f6bde 100644 --- a/src/test/scala/redis/actors/RedisReplyDecoderSpec.scala +++ b/src/test/scala/redis/actors/RedisReplyDecoderSpec.scala @@ -11,6 +11,7 @@ import java.net.InetSocketAddress import com.typesafe.config.ConfigFactory import redis.{Redis, Operation} import redis.api.connection.Ping +import scala.concurrent.duration.DurationInt class RedisReplyDecoderSpec extends TestKit(ActorSystem("testsystem", ConfigFactory.parseString( """akka.loggers = ["akka.testkit.TestEventListener"]"""))) @@ -173,7 +174,7 @@ class RedisReplyDecoderSpec } class RedisClientActorMock2(probeMock: ActorRef) - extends RedisClientActor(new InetSocketAddress("localhost", 6379), () => {Seq()}) { + extends RedisClientActor(new InetSocketAddress("localhost", 6379), 2 seconds, () => {Seq()}) { override def preStart() { // disable preStart of RedisWorkerIO } diff --git a/src/test/scala/redis/actors/RedisSubscriberActorSpec.scala b/src/test/scala/redis/actors/RedisSubscriberActorSpec.scala index 12a8c2c6..4fc1e609 100644 --- a/src/test/scala/redis/actors/RedisSubscriberActorSpec.scala +++ b/src/test/scala/redis/actors/RedisSubscriberActorSpec.scala @@ -11,6 +11,7 @@ import redis.Redis import akka.io.Tcp._ import redis.api.pubsub.Message import redis.api.pubsub.PMessage +import scala.concurrent.duration.{DurationInt, FiniteDuration} class RedisSubscriberActorSpec extends TestKit(ActorSystem()) with SpecificationLike with Tags with NoTimeConversions with ImplicitSender { @@ -23,7 +24,7 @@ class RedisSubscriberActorSpec extends TestKit(ActorSystem()) with Specification val patterns = Seq("pattern.*") val subscriberActor = TestActorRef[SubscriberActor](Props(classOf[SubscriberActor], - new InetSocketAddress("localhost", 6379), channels, patterns, probeMock.ref) + new InetSocketAddress("localhost", 6379), channels, patterns, probeMock.ref, 2 seconds) .withDispatcher(Redis.dispatcher)) val connectMsg = probeMock.expectMsgType[Connect] @@ -62,8 +63,9 @@ class RedisSubscriberActorSpec extends TestKit(ActorSystem()) with Specification class SubscriberActor(address: InetSocketAddress, channels: Seq[String], patterns: Seq[String], - probeMock: ActorRef - ) extends RedisSubscriberActor(address, channels, patterns) { + probeMock: ActorRef, + reconnectDuration: FiniteDuration + ) extends RedisSubscriberActor(address, channels, patterns, reconnectDuration) { override val tcp = probeMock diff --git a/src/test/scala/redis/actors/RedisWorkerIOSpec.scala b/src/test/scala/redis/actors/RedisWorkerIOSpec.scala index d77f3833..62fa96ff 100644 --- a/src/test/scala/redis/actors/RedisWorkerIOSpec.scala +++ b/src/test/scala/redis/actors/RedisWorkerIOSpec.scala @@ -13,19 +13,23 @@ import akka.io.Tcp.Register import akka.io.Tcp.Connect import akka.io.Tcp.CommandFailed import redis.Redis +import scala.concurrent.duration.FiniteDuration class RedisWorkerIOSpec extends TestKit(ActorSystem()) with SpecificationLike with Tags with NoTimeConversions with ImplicitSender { import scala.concurrent.duration._ + "RedisWorkerIO" should { val address = new InetSocketAddress("localhost", 6379) + val reconnectDuration = 2.seconds + "connect CommandFailed then reconnect" in { val probeTcp = TestProbe() val probeMock = TestProbe() - val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty).withDispatcher(Redis.dispatcher)) + val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty, reconnectDuration).withDispatcher(Redis.dispatcher)) val connectMsg = probeTcp.expectMsgType[Connect] connectMsg mustEqual Connect(address) @@ -48,7 +52,7 @@ class RedisWorkerIOSpec extends TestKit(ActorSystem()) with SpecificationLike wi val probeTcp = TestProbe() val probeMock = TestProbe() - val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty).withDispatcher(Redis.dispatcher)) + val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty, reconnectDuration).withDispatcher(Redis.dispatcher)) redisWorkerIO ! "PING1" @@ -79,7 +83,7 @@ class RedisWorkerIOSpec extends TestKit(ActorSystem()) with SpecificationLike wi val probeTcp = TestProbe() val probeMock = TestProbe() - val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty).withDispatcher(Redis.dispatcher)) + val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty, reconnectDuration).withDispatcher(Redis.dispatcher)) redisWorkerIO ! "PING1" @@ -120,7 +124,7 @@ class RedisWorkerIOSpec extends TestKit(ActorSystem()) with SpecificationLike wi val probeTcp = TestProbe() val probeMock = TestProbe() - val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty).withDispatcher(Redis.dispatcher)) + val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty, reconnectDuration).withDispatcher(Redis.dispatcher)) redisWorkerIO ! "PING1" @@ -143,7 +147,7 @@ class RedisWorkerIOSpec extends TestKit(ActorSystem()) with SpecificationLike wi val probeTcp = TestProbe() val probeMock = TestProbe() - val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty).withDispatcher(Redis.dispatcher)) + val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty, reconnectDuration).withDispatcher(Redis.dispatcher)) redisWorkerIO ! "PING1" @@ -165,7 +169,7 @@ class RedisWorkerIOSpec extends TestKit(ActorSystem()) with SpecificationLike wi val probeTcp = TestProbe() val probeMock = TestProbe() - val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty).withDispatcher(Redis.dispatcher)) + val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, ByteString.empty, reconnectDuration).withDispatcher(Redis.dispatcher)) redisWorkerIO ! "PING1" @@ -218,7 +222,7 @@ class RedisWorkerIOSpec extends TestKit(ActorSystem()) with SpecificationLike wi val probeMock = TestProbe() val onConnectByteString = ByteString("on connect write") - val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, onConnectByteString).withDispatcher(Redis.dispatcher)) + val redisWorkerIO = TestActorRef[RedisWorkerIOMock](Props(classOf[RedisWorkerIOMock], probeTcp.ref, address, probeMock.ref, onConnectByteString, reconnectDuration).withDispatcher(Redis.dispatcher)) val connectMsg = probeTcp.expectMsgType[Connect] @@ -258,7 +262,8 @@ class RedisWorkerIOSpec extends TestKit(ActorSystem()) with SpecificationLike wi } -class RedisWorkerIOMock(probeTcp: ActorRef, address: InetSocketAddress, probeMock: ActorRef, _onConnectWrite: ByteString) extends RedisWorkerIO(address) { +class RedisWorkerIOMock(probeTcp: ActorRef, address: InetSocketAddress, probeMock: ActorRef, _onConnectWrite: ByteString, reconnectDuration: FiniteDuration) + extends RedisWorkerIO(address, reconnectDuration) { override val tcp = probeTcp def writing: Receive = {