From cf432768ad7a44846bd551651a1895c86e10a7fb Mon Sep 17 00:00:00 2001 From: "Andy.Chen" Date: Wed, 11 Sep 2024 19:48:57 +0800 Subject: [PATCH 1/5] add unit test protect ExtractEntityId can be shared safely Related with #1463 --- .../cluster/sharding/ShardRegionSpec.scala | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala index 095df727a85..5ebef8288b3 100644 --- a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala +++ b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala @@ -18,12 +18,17 @@ import java.io.File import com.typesafe.config.ConfigFactory import org.apache.commons.io.FileUtils import org.apache.pekko +import org.apache.pekko.Done +import org.apache.pekko.cluster.sharding.ShardRegion.MessageExtractor +import org.apache.pekko.stream.scaladsl.{ Sink, Source } import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, PoisonPill, Props } import pekko.cluster.{ Cluster, MemberStatus } import pekko.cluster.ClusterEvent.CurrentClusterState import pekko.testkit.{ DeadLettersFilter, PekkoSpec, TestProbe, WithLogCapturing } import pekko.testkit.TestEvent.Mute +import scala.concurrent.{ ExecutionContext, Future } + object ShardRegionSpec { val host = "127.0.0.1" val tempConfig = @@ -54,6 +59,7 @@ object ShardRegionSpec { val shardTypeName = "Caat" val numberOfShards = 3 + val largerShardNum = 20 val extractEntityId: ShardRegion.ExtractEntityId = { case msg: Int => (msg.toString, msg) @@ -66,11 +72,37 @@ object ShardRegionSpec { case _ => throw new IllegalArgumentException() } + val messageExtractor: MessageExtractor = new MessageExtractor { + override def entityId(message: Any): String = message match { + case msg: Int => msg.toString + case _ => throw new IllegalArgumentException() + } + + override def shardId(message: Any): String = message match { + case msg: Int => (msg % largerShardNum).toString + case _ => throw new IllegalArgumentException() + } + + override def entityMessage(message: Any): Any = message + } + class EntityActor extends Actor with ActorLogging { override def receive: Receive = { case msg => sender() ! msg } } + + class IDMatcherActor extends Actor with ActorLogging { + override def receive: Receive = { + case msg => + val selfEntityId = self.path.name + val msgEntityId = messageExtractor.entityId(msg) + if (selfEntityId != msgEntityId) { + throw new IllegalStateException(s"EntityId mismatch: $selfEntityId != $msgEntityId") + } + sender() ! msg + } + } } class ShardRegionSpec extends PekkoSpec(ShardRegionSpec.config) with WithLogCapturing { @@ -183,4 +215,43 @@ class ShardRegionSpec extends PekkoSpec(ShardRegionSpec.config) with WithLogCapt } } + "ExtractEntityId" must { + "can be safely share to multiple shards" in { + implicit val ec: ExecutionContext = system.dispatcher + + Cluster(sysA).join(Cluster(sysA).selfAddress) // coordinator on A + awaitAssert(Cluster(sysA).selfMember.status shouldEqual MemberStatus.Up, 1.second) + + within(10.seconds) { + awaitAssert { + Set(sysA).foreach { s => + Cluster(s).sendCurrentClusterState(testActor) + expectMsgType[CurrentClusterState].members.size shouldEqual 2 + } + } + } + + val shardTypeName = "Doog" + val region = ClusterSharding(sysA).start( + shardTypeName, + Props[IDMatcherActor](), + ClusterShardingSettings(system), + messageExtractor) + + val total = largerShardNum * 100 + val source = Source(1 to total) + + val flow = source.mapAsync(parallelism = largerShardNum) { i => + Future { + region.tell(i, p1.ref) + } + } + + val result = flow.runWith(Sink.ignore) + + result.futureValue shouldEqual Done + p1.receiveN(total, 10.seconds) + } + } + } From db88f9fabb91595f6e091a6c2f9455965f206252 Mon Sep 17 00:00:00 2001 From: "AndyChen(Jingzhang)" Date: Wed, 5 Jun 2024 23:23:33 +0800 Subject: [PATCH 2/5] chore: avoid the double evaluation of entityId in ClusterSharding (#1304) * chore: avoid the double evaluation of entityId in ClusterSharding * new cacheable partial function * optimized for review * fix the right type --- .../typed/internal/ClusterShardingImpl.scala | 15 ++++++--- .../cluster/sharding/ClusterSharding.scala | 31 +++++++++++++------ 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 026611912c7..525718d2277 100644 --- a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -20,6 +20,7 @@ import java.util.concurrent.CompletionStage import java.util.concurrent.ConcurrentHashMap import scala.concurrent.Future +import scala.runtime.AbstractPartialFunction import org.apache.pekko import pekko.actor.ActorRefProvider @@ -172,10 +173,16 @@ import pekko.util.JavaDurationConverters._ allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] = { val extractorAdapter = new ExtractorAdapter(extractor) - val extractEntityId: ShardRegion.ExtractEntityId = { - // TODO is it possible to avoid the double evaluation of entityId - case message if extractorAdapter.entityId(message) != null => - (extractorAdapter.entityId(message), extractorAdapter.unwrapMessage(message)) + // !!!important is only applicable if you know that isDefinedAt(x) is always called before apply(x) (with the same x) + val extractEntityId: ShardRegion.ExtractEntityId = new AbstractPartialFunction[Any, (String, Any)] { + var cache: String = _ + + override def isDefinedAt(msg: Any): Boolean = { + cache = extractorAdapter.entityId(msg) + cache != null + } + + override def apply(x: Any): (String, Any) = (cache, extractorAdapter.unwrapMessage(x)) } val extractShardId: ShardRegion.ExtractShardId = { message => extractorAdapter.entityId(message) match { diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala index 9278839da3d..835d2fcfbb2 100755 --- a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala +++ b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala @@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.immutable import scala.concurrent.Await +import scala.runtime.AbstractPartialFunction import scala.util.control.NonFatal import org.apache.pekko @@ -429,15 +430,26 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { typeName, _ => entityProps, settings, - extractEntityId = { - case msg if messageExtractor.entityId(msg) ne null => - (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) - }, + extractEntityId = extractEntityIdFromExtractor(messageExtractor), extractShardId = msg => messageExtractor.shardId(msg), allocationStrategy = allocationStrategy, handOffStopMessage = handOffStopMessage) } + // !!!important is only applicable if you know that isDefinedAt(x) is always called before apply(x) (with the same x) + private def extractEntityIdFromExtractor( + messageExtractor: ShardRegion.MessageExtractor): ShardRegion.ExtractEntityId = + new AbstractPartialFunction[Any, (String, Any)] { + var cache: String = _ + + override def isDefinedAt(msg: Any): Boolean = { + cache = messageExtractor.entityId(msg) + cache != null + } + + override def apply(x: Any): (String, Any) = (cache, messageExtractor.entityMessage(x)) + } + /** * Java/Scala API: Register a named entity type by defining the [[pekko.actor.Props]] of the entity actor * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor @@ -612,11 +624,12 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { dataCenter: Optional[String], messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - startProxy(typeName, Option(role.orElse(null)), Option(dataCenter.orElse(null)), - extractEntityId = { - case msg if messageExtractor.entityId(msg) ne null => - (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) - }, extractShardId = msg => messageExtractor.shardId(msg)) + startProxy( + typeName, + Option(role.orElse(null)), + Option(dataCenter.orElse(null)), + extractEntityId = extractEntityIdFromExtractor(messageExtractor), + msg => messageExtractor.shardId(msg)) } From ed6798f9608af5c70a7e901b00f497cce3af711f Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 7 Sep 2024 21:57:02 +0100 Subject: [PATCH 3/5] =?UTF-8?q?Revert=20"chore:=20avoid=20the=20double=20e?= =?UTF-8?q?valuation=20of=20entityId=20in=20ClusterSharding=20(#1=E2=80=A6?= =?UTF-8?q?"=20(#1464)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit b0e9886439bf216dace9e2979ea820521ddd2a63. --- .../typed/internal/ClusterShardingImpl.scala | 15 +++------ .../cluster/sharding/ClusterSharding.scala | 31 ++++++------------- 2 files changed, 13 insertions(+), 33 deletions(-) diff --git a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 525718d2277..026611912c7 100644 --- a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -20,7 +20,6 @@ import java.util.concurrent.CompletionStage import java.util.concurrent.ConcurrentHashMap import scala.concurrent.Future -import scala.runtime.AbstractPartialFunction import org.apache.pekko import pekko.actor.ActorRefProvider @@ -173,16 +172,10 @@ import pekko.util.JavaDurationConverters._ allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] = { val extractorAdapter = new ExtractorAdapter(extractor) - // !!!important is only applicable if you know that isDefinedAt(x) is always called before apply(x) (with the same x) - val extractEntityId: ShardRegion.ExtractEntityId = new AbstractPartialFunction[Any, (String, Any)] { - var cache: String = _ - - override def isDefinedAt(msg: Any): Boolean = { - cache = extractorAdapter.entityId(msg) - cache != null - } - - override def apply(x: Any): (String, Any) = (cache, extractorAdapter.unwrapMessage(x)) + val extractEntityId: ShardRegion.ExtractEntityId = { + // TODO is it possible to avoid the double evaluation of entityId + case message if extractorAdapter.entityId(message) != null => + (extractorAdapter.entityId(message), extractorAdapter.unwrapMessage(message)) } val extractShardId: ShardRegion.ExtractShardId = { message => extractorAdapter.entityId(message) match { diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala index 835d2fcfbb2..9278839da3d 100755 --- a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala +++ b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala @@ -19,7 +19,6 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.immutable import scala.concurrent.Await -import scala.runtime.AbstractPartialFunction import scala.util.control.NonFatal import org.apache.pekko @@ -430,26 +429,15 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { typeName, _ => entityProps, settings, - extractEntityId = extractEntityIdFromExtractor(messageExtractor), + extractEntityId = { + case msg if messageExtractor.entityId(msg) ne null => + (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) + }, extractShardId = msg => messageExtractor.shardId(msg), allocationStrategy = allocationStrategy, handOffStopMessage = handOffStopMessage) } - // !!!important is only applicable if you know that isDefinedAt(x) is always called before apply(x) (with the same x) - private def extractEntityIdFromExtractor( - messageExtractor: ShardRegion.MessageExtractor): ShardRegion.ExtractEntityId = - new AbstractPartialFunction[Any, (String, Any)] { - var cache: String = _ - - override def isDefinedAt(msg: Any): Boolean = { - cache = messageExtractor.entityId(msg) - cache != null - } - - override def apply(x: Any): (String, Any) = (cache, messageExtractor.entityMessage(x)) - } - /** * Java/Scala API: Register a named entity type by defining the [[pekko.actor.Props]] of the entity actor * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor @@ -624,12 +612,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { dataCenter: Optional[String], messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - startProxy( - typeName, - Option(role.orElse(null)), - Option(dataCenter.orElse(null)), - extractEntityId = extractEntityIdFromExtractor(messageExtractor), - msg => messageExtractor.shardId(msg)) + startProxy(typeName, Option(role.orElse(null)), Option(dataCenter.orElse(null)), + extractEntityId = { + case msg if messageExtractor.entityId(msg) ne null => + (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) + }, extractShardId = msg => messageExtractor.shardId(msg)) } From 5f114e32232d2a33d2a6385ed0fd689041d521dd Mon Sep 17 00:00:00 2001 From: "Andy.Chen" Date: Wed, 11 Sep 2024 21:13:29 +0800 Subject: [PATCH 4/5] grammar fix --- .../org/apache/pekko/cluster/sharding/ShardRegionSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala index 5ebef8288b3..1927cfed87a 100644 --- a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala +++ b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala @@ -216,7 +216,7 @@ class ShardRegionSpec extends PekkoSpec(ShardRegionSpec.config) with WithLogCapt } "ExtractEntityId" must { - "can be safely share to multiple shards" in { + "be safely able to share multiple shards" in { implicit val ec: ExecutionContext = system.dispatcher Cluster(sysA).join(Cluster(sysA).selfAddress) // coordinator on A From 42c1a91c56f41f37bc68af52ba33ba58bb633dcb Mon Sep 17 00:00:00 2001 From: "Andy.Chen" Date: Wed, 11 Sep 2024 23:17:14 +0800 Subject: [PATCH 5/5] sort imports --- .../org/apache/pekko/cluster/sharding/ShardRegionSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala index 1927cfed87a..95a7df4296b 100644 --- a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala +++ b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ShardRegionSpec.scala @@ -18,12 +18,12 @@ import java.io.File import com.typesafe.config.ConfigFactory import org.apache.commons.io.FileUtils import org.apache.pekko -import org.apache.pekko.Done -import org.apache.pekko.cluster.sharding.ShardRegion.MessageExtractor -import org.apache.pekko.stream.scaladsl.{ Sink, Source } import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, PoisonPill, Props } import pekko.cluster.{ Cluster, MemberStatus } import pekko.cluster.ClusterEvent.CurrentClusterState +import pekko.cluster.sharding.ShardRegion.MessageExtractor +import pekko.Done +import pekko.stream.scaladsl.{ Sink, Source } import pekko.testkit.{ DeadLettersFilter, PekkoSpec, TestProbe, WithLogCapturing } import pekko.testkit.TestEvent.Mute