From cfeab772522de453183948846a43d0cc28a4b73f Mon Sep 17 00:00:00 2001 From: thirstycrow <5451vs5451@gmail.com> Date: Fri, 17 Feb 2017 15:02:13 +0800 Subject: [PATCH] connect to new nodes / disconnect from unused nodes --- src/main/scala/redis/RedisCluster.scala | 34 +++++++---- .../scala/redis/actors/RedisWorkerIO.scala | 3 + src/main/scala/redis/api/Clusters.scala | 6 +- src/test/scala/redis/RedisClusterSpec.scala | 59 +++++++++++++++++++ 4 files changed, 90 insertions(+), 12 deletions(-) create mode 100644 src/test/scala/redis/RedisClusterSpec.scala diff --git a/src/main/scala/redis/RedisCluster.scala b/src/main/scala/redis/RedisCluster.scala index d5a7dda2..10d01bee 100644 --- a/src/main/scala/redis/RedisCluster.scala +++ b/src/main/scala/redis/RedisCluster.scala @@ -11,27 +11,32 @@ import redis.protocol.RedisReply import redis.util.CRC16 import scala.annotation.tailrec -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import scala.concurrent.stm.Ref import scala.concurrent.{Await, Future, Promise} import scala.util.control.NonFatal case class RedisCluster(redisServers: Seq[RedisServer], - name: String = "RedisClientPool") + name: String = "RedisClientPool", + password: Option[String] = None) (implicit _system: ActorSystem, redisDispatcher: RedisDispatcher = Redis.dispatcher ) extends RedisClientPoolLike(_system, redisDispatcher) with RedisCommands { val log = Logging.getLogger(_system, this) - override val redisServerConnections = { - redisServers.map { server => - makeRedisConnection(server, defaultActive = true) - } toMap + override val redisServerConnections = collection.mutable.Map { + redisServers.map(makeConnection): _* } refreshConnections() + def makeConnection(server: RedisServer) = { + makeRedisConnection( + server = server.copy(password = password, db = None), + defaultActive = true + ) + } def equalsHostPort(clusterNode:ClusterNode,server:RedisServer) = { clusterNode.host == server.host && clusterNode.port == server.port @@ -55,15 +60,17 @@ case class RedisCluster(redisServers: Seq[RedisServer], val clusterSlotsRef:Ref[Option[Map[ClusterSlot, RedisConnection]]] = Ref(Option.empty[Map[ClusterSlot, RedisConnection]]) val lockClusterSlots = Ref(true) - Await.result(asyncRefreshClusterSlots(force=true), Duration(10,TimeUnit.SECONDS)) + Await.result(asyncRefreshClusterSlots(force=true), 10.seconds) def getClusterSlots(): Future[Map[ClusterSlot, RedisConnection]] = { def resolveClusterSlots(retry:Int): Future[Map[ClusterSlot, RedisConnection]] = { clusterSlots().map { clusterSlots => - clusterSlots.flatMap { clusterSlot => - val maybeServerConnection = redisServerConnections.find { case (server, _) => equalsHostPort(clusterSlot.master, server) } - maybeServerConnection.map { case (_, redisConnection) => (clusterSlot, redisConnection) } + clusterSlots.map { clusterSlot => + val server = clusterSlot.master.hostAndPort + val connection = redisServerConnections + .getOrElseUpdate(server, makeConnection(server)._2) + (clusterSlot, connection) }.toMap }.recoverWith { case e => @@ -83,6 +90,13 @@ case class RedisCluster(redisServers: Seq[RedisServer], getClusterSlots().map { clusterSlot => log.info("refreshClusterSlots: " + clusterSlot.toString()) clusterSlotsRef.single.set(Some(clusterSlot)) + val serverSet = clusterSlot.keysIterator.map(_.master.hostAndPort).toSet + redisServerConnections.keys.foreach { server => + if (!serverSet.contains(server)) { + redisServerConnections.remove(server) + //.map(connection => _system.stop(connection.actor)) + } + } lockClusterSlots.single.compareAndSet(true, false) () }.recoverWith { diff --git a/src/main/scala/redis/actors/RedisWorkerIO.scala b/src/main/scala/redis/actors/RedisWorkerIO.scala index 32202217..b5af8a16 100644 --- a/src/main/scala/redis/actors/RedisWorkerIO.scala +++ b/src/main/scala/redis/actors/RedisWorkerIO.scala @@ -44,6 +44,9 @@ abstract class RedisWorkerIO(val address: InetSocketAddress, onConnectStatus: Bo override def postStop() { log.info("RedisWorkerIO stop") + if (tcpWorker != null) { + tcpWorker ! Close + } } def initConnectedBuffer() { diff --git a/src/main/scala/redis/api/Clusters.scala b/src/main/scala/redis/api/Clusters.scala index a834a8ed..131f2cc1 100644 --- a/src/main/scala/redis/api/Clusters.scala +++ b/src/main/scala/redis/api/Clusters.scala @@ -1,7 +1,7 @@ package redis.api.clusters import akka.util.ByteString -import redis.{MultiBulkConverter, RedisCommand, RedisCommandMultiBulk, RedisCommandStatusString} +import redis.{MultiBulkConverter, RedisCommand, RedisCommandMultiBulk, RedisCommandStatusString, RedisServer} import redis.api.connection.Ping._ import redis.protocol.{DecodeResult, Bulk, MultiBulk, RedisProtocolReply, RedisReply} @@ -9,7 +9,9 @@ import scala.math.Ordering -case class ClusterNode(host:String, port:Int, id:String) +case class ClusterNode(host:String, port:Int, id:String) { + def hostAndPort = RedisServer(host, port) +} case class ClusterSlot(begin:Int, end:Int, master:ClusterNode, slaves:Seq[ClusterNode]) extends Comparable[ClusterSlot] { override def compareTo(x: ClusterSlot): Int = { this.begin.compare(x.begin) diff --git a/src/test/scala/redis/RedisClusterSpec.scala b/src/test/scala/redis/RedisClusterSpec.scala new file mode 100644 index 00000000..0e037d10 --- /dev/null +++ b/src/test/scala/redis/RedisClusterSpec.scala @@ -0,0 +1,59 @@ +package redis + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import akka.testkit.TestProbe +import org.specs2.mutable.SpecificationLike +import redis.api.clusters.{ClusterNode, ClusterSlot} + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.DurationInt +import scala.concurrent.stm.Ref + +class RedisClusterSpec extends TestKit(ActorSystem()) with SpecificationLike { + + var _clusterSlots = Seq(clusterSlot("1", 6000, 0, 16383)) + + val redisCluster = new RedisCluster(Seq(RedisServer("127.0.0.1", 6000))) { + override def clusterSlots() = Future(_clusterSlots) + override def makeConnection(server: RedisServer) = (server, RedisConnection(TestProbe().ref, Ref(true))) + } + + def clusterSlot(nodeId: String, port: Int, begin: Int, end: Int) = + ClusterSlot(begin, end, ClusterNode("127.0.0.1", port, "1"), Nil) + + def checkSlotMaps() = { + redisCluster.redisServerConnections.keySet.toSet mustEqual _clusterSlots.map(_.master.hostAndPort).toSet + redisCluster.clusterSlotsRef.single.get.map(_.keySet.toSet) mustEqual Some(_clusterSlots.toSet) + _clusterSlots.foreach { slots => + val connection = redisCluster.redisServerConnections.get(slots.master.hostAndPort) + (slots.begin to slots.end).foreach { slot => + redisCluster.getClusterAndConnection(slot).map(_._2) mustEqual (connection) + } + } + success + } + + "redis cluster" should { + + "add new nodes" in { + _clusterSlots = Seq( + clusterSlot("1", 6000, 0, 4095), + clusterSlot("2", 6001, 4096, 8191), + clusterSlot("1", 6000, 8192, 12287), + clusterSlot("2", 6001, 12288, 16383) + ) + Await.result(redisCluster.asyncRefreshClusterSlots(true), 10.seconds) + checkSlotMaps() + } + + "remove unused nodes" in { + _clusterSlots = Seq( + clusterSlot("2", 6001, 0, 16383) + ) + Await.result(redisCluster.asyncRefreshClusterSlots(true), 10.seconds) + Await.result(redisCluster.asyncRefreshClusterSlots(true), 10.seconds) + checkSlotMaps() + } + } +}