From 7d2356de0bf79431dd2a18b04d98c12cd95188c2 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 16 Dec 2024 21:12:59 +0100 Subject: [PATCH 01/15] Revert "revert #1568 due to test failures (#1587)" This reverts commit 7af03e5215dc69df9702803dba7f22d94b49ead4. --- .../apache/pekko/cluster/ClusterDaemon.scala | 14 +-- .../cluster/MixedProtocolClusterSpec.scala | 92 +++++++++++++++++++ 2 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index 34320067be1..c703bdd6b12 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -365,6 +365,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val statsEnabled = PublishStatsInterval.isFinite var gossipStats = GossipStats() + val acceptedProtocols = context.system.settings.config.getStringList("pekko.remote.accept-protocol-names") + var seedNodes = SeedNodes var seedNodeProcess: Option[ActorRef] = None var seedNodeProcessCounter = 0 // for unique names @@ -701,10 +703,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh * which will reply with a `Welcome` message. */ def join(address: Address): Unit = { - if (address.protocol != selfAddress.protocol) + if (!acceptedProtocols.contains(address.protocol)) logWarning( - "Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]", - selfAddress.protocol, + "Trying to join member with wrong protocol, but was ignored, expected any of [{}] but was [{}]", + acceptedProtocols, address.protocol) else if (address.system != selfAddress.system) logWarning( @@ -750,10 +752,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = { if (!preparingForShutdown) { val selfStatus = latestGossip.member(selfUniqueAddress).status - if (joiningNode.address.protocol != selfAddress.protocol) + if (!acceptedProtocols.contains(joiningNode.address.protocol)) logWarning( - "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", - selfAddress.protocol, + "Member with wrong protocol tried to join, but was ignored, expected any of {} but was [{}]", + acceptedProtocols, joiningNode.address.protocol) else if (joiningNode.address.system != selfAddress.system) logWarning( diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala new file mode 100644 index 00000000000..45382f4b353 --- /dev/null +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.cluster + +import com.typesafe.config.{ Config, ConfigFactory } + +import org.apache.pekko.testkit.{ LongRunningTest, PekkoSpec } + +object MixedProtocolClusterSpec { + + val baseConfig: Config = + ConfigFactory.parseString(""" + pekko.actor.provider = "cluster" + pekko.coordinated-shutdown.terminate-actor-system = on + + pekko.remote.classic.netty.tcp.port = 0 + pekko.remote.artery.canonical.port = 0 + pekko.remote.artery.advanced.aeron.idle-cpu-level = 3 + pekko.remote.accept-protocol-names = ["pekko", "akka"] + + pekko.cluster.jmx.multi-mbeans-in-same-jvm = on + pekko.cluster.configuration-compatibility-check.enforce-on-join = off + """) + + val configWithPekko: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "pekko" + """).withFallback(baseConfig) + + val configWithAkka: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "akka" + """).withFallback(baseConfig) +} + +class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { + + import MixedProtocolClusterSpec._ + + "A node using the akka protocol" must { + + "be allowed to join a cluster with a node using the pekko protocol" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekko) + + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkka) + clusterTestUtil.formCluster() + + try { + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + + "allow a node using the pekko protocol to join the cluster" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkka) + + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekko) + clusterTestUtil.formCluster() + + try { + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + } +} From 2dbe5b10dd5131635679e8c556b21a622e045372 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 16 Dec 2024 21:14:18 +0100 Subject: [PATCH 02/15] temp run nightly test in this PR --- .github/workflows/nightly-builds.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/nightly-builds.yml b/.github/workflows/nightly-builds.yml index 12cb31073c9..c473d1d1d06 100644 --- a/.github/workflows/nightly-builds.yml +++ b/.github/workflows/nightly-builds.yml @@ -3,6 +3,7 @@ name: Nightly Builds on: schedule: - cron: "0 0 * * *" + pull_request: workflow_dispatch: permissions: {} From a2b8cc9d59f3e3e9bd65e7cd1660ea27c085e66c Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 16 Dec 2024 21:38:02 +0100 Subject: [PATCH 03/15] no need for square brackets because the set print adds them --- .../src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index c703bdd6b12..691c31667b9 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -705,7 +705,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def join(address: Address): Unit = { if (!acceptedProtocols.contains(address.protocol)) logWarning( - "Trying to join member with wrong protocol, but was ignored, expected any of [{}] but was [{}]", + "Trying to join member with wrong protocol, but was ignored, expected any of {} but was [{}]", acceptedProtocols, address.protocol) else if (address.system != selfAddress.system) From 17655ee327d377db0f9f4a64624c9cb0fd7d6ab5 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 16 Dec 2024 21:46:12 +0100 Subject: [PATCH 04/15] logging to find issue --- .../main/scala/org/apache/pekko/cluster/ClusterDaemon.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index 691c31667b9..9ddb9bc8bfb 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -705,9 +705,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def join(address: Address): Unit = { if (!acceptedProtocols.contains(address.protocol)) logWarning( - "Trying to join member with wrong protocol, but was ignored, expected any of {} but was [{}]", + "Trying to join member with wrong protocol, but was ignored, expected any of {} but was [{}] - selfAddress.protocol={}", acceptedProtocols, - address.protocol) + address.protocol, + selfAddress.protocol) else if (address.system != selfAddress.system) logWarning( "Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]", From d8908882479398831c4a099e67c0c984b1ea4437 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 16 Dec 2024 22:59:17 +0100 Subject: [PATCH 05/15] support tcp protocols --- .../apache/pekko/cluster/ClusterDaemon.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index 9ddb9bc8bfb..bc14f7864ab 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -13,13 +13,13 @@ package org.apache.pekko.cluster +import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.control.NonFatal -import scala.annotation.nowarn import com.typesafe.config.Config import org.apache.pekko @@ -30,13 +30,12 @@ import pekko.annotation.InternalApi import pekko.cluster.ClusterEvent._ import pekko.cluster.MemberStatus._ import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } -import pekko.event.ActorWithLogClass -import pekko.event.Logging +import pekko.event.{ ActorWithLogClass, Logging } import pekko.pattern.ask import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent } import pekko.remote.artery.QuarantinedEvent -import pekko.util.Timeout -import pekko.util.Version +import pekko.util.{ Timeout, Version } +import pekko.util.ccompat.JavaConverters._ /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -365,7 +364,19 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val statsEnabled = PublishStatsInterval.isFinite var gossipStats = GossipStats() - val acceptedProtocols = context.system.settings.config.getStringList("pekko.remote.accept-protocol-names") + val acceptedProtocols: Set[String] = { + val initList = context.system.settings.config + .getStringList("pekko.remote.accept-protocol-names") + .asScala + initList.toSeq.foreach { protocol => + if (!protocol.endsWith(".tcp")) { + val tcpProtocol = s"$protocol.tcp" + if (!initList.contains(tcpProtocol)) + initList.+=(tcpProtocol) + } + } + initList.toSet + } var seedNodes = SeedNodes var seedNodeProcess: Option[ActorRef] = None From 2783e3c8fff2eff8a5f5332ba26bae6e27951916 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 17 Dec 2024 10:07:50 +0100 Subject: [PATCH 06/15] Update ClusterDaemon.scala --- .../apache/pekko/cluster/ClusterDaemon.scala | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index bc14f7864ab..b4688aa9a24 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -32,10 +32,9 @@ import pekko.cluster.MemberStatus._ import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import pekko.event.{ ActorWithLogClass, Logging } import pekko.pattern.ask -import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent } +import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent, RemoteSettings } import pekko.remote.artery.QuarantinedEvent import pekko.util.{ Timeout, Version } -import pekko.util.ccompat.JavaConverters._ /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -365,17 +364,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh var gossipStats = GossipStats() val acceptedProtocols: Set[String] = { - val initList = context.system.settings.config - .getStringList("pekko.remote.accept-protocol-names") - .asScala - initList.toSeq.foreach { protocol => - if (!protocol.endsWith(".tcp")) { - val tcpProtocol = s"$protocol.tcp" - if (!initList.contains(tcpProtocol)) - initList.+=(tcpProtocol) - } - } - initList.toSet + val remoteSettings: RemoteSettings = new RemoteSettings(context.system.settings.config) + val initSet = remoteSettings.AcceptProtocolNames + val tcpSet = initSet.map(protocol => s"$protocol.tcp") + initSet ++ tcpSet } var seedNodes = SeedNodes From e2894b5869cd791448bcd86d6787d5b51802021a Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 17 Dec 2024 11:37:27 +0100 Subject: [PATCH 07/15] remove temp logging --- .../main/scala/org/apache/pekko/cluster/ClusterDaemon.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index b4688aa9a24..479d58e97bb 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -708,10 +708,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def join(address: Address): Unit = { if (!acceptedProtocols.contains(address.protocol)) logWarning( - "Trying to join member with wrong protocol, but was ignored, expected any of {} but was [{}] - selfAddress.protocol={}", + "Trying to join member with wrong protocol, but was ignored, expected any of {} but was [{}]", acceptedProtocols, - address.protocol, - selfAddress.protocol) + address.protocol) else if (address.system != selfAddress.system) logWarning( "Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]", From a767289c5b07ac1bdb819f2cbd81eaa369265a3b Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 17 Dec 2024 12:43:33 +0100 Subject: [PATCH 08/15] try to fix issue in Remoting --- .../org/apache/pekko/remote/Remoting.scala | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala b/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala index 85812471143..16811bc837a 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala @@ -225,11 +225,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc Await.result(addressesPromise.future, StartupTimeout.duration) if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null) - transportMapping = transports + val mapping = transports .groupBy { case (transport, _) => transport.schemeIdentifier } .map { case (k, v) => k -> v.toSet } + transportMapping = addProtocolsToMap(mapping) defaultAddress = transports.head._2 addresses = transports.map { _._2 }.toSet @@ -296,6 +297,21 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc } } } + + private def addProtocolsToMap( + map: Map[String, Set[(PekkoProtocolTransport, Address)]]): Map[String, Set[(PekkoProtocolTransport, Address)]] = { + if (AcceptProtocolNames.size > 1) { + map.flatMap { case (protocol, transports) => + val tcpProtocol = protocol.endsWith(".tcp") + AcceptProtocolNames.map { newProtocol => + if (tcpProtocol) + s"$newProtocol.tcp" -> transports + else + newProtocol -> transports + } + } + } else map + } } /** @@ -567,7 +583,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) } OneForOneStrategy(loggingEnabled = false) { - case InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) => + case InvalidAssociation(localAddress, remoteAddress, reason, disassociationInfo) => keepQuarantinedOr(remoteAddress) { val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]" log.warning( @@ -580,7 +596,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) causedBy) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) } - disassiciationInfo.foreach { + disassociationInfo.foreach { case AssociationHandle.Quarantined => context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress)) case _ => // do nothing From fe9d1d359ab5653c147d474b44693ede4a1de63a Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 28 Dec 2024 17:50:49 +0100 Subject: [PATCH 09/15] extra tests --- .../cluster/MixedProtocolClusterSpec.scala | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala index 45382f4b353..b40719a55ba 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -28,7 +28,6 @@ object MixedProtocolClusterSpec { pekko.actor.provider = "cluster" pekko.coordinated-shutdown.terminate-actor-system = on - pekko.remote.classic.netty.tcp.port = 0 pekko.remote.artery.canonical.port = 0 pekko.remote.artery.advanced.aeron.idle-cpu-level = 3 pekko.remote.accept-protocol-names = ["pekko", "akka"] @@ -42,10 +41,22 @@ object MixedProtocolClusterSpec { pekko.remote.protocol-name = "pekko" """).withFallback(baseConfig) + val configWithPekkoTcp: Config = + ConfigFactory.parseString(""" + pekko.remote.classic.netty.tcp.port = 0 + pekko.remote.protocol-name = "pekko" + """).withFallback(baseConfig) + val configWithAkka: Config = ConfigFactory.parseString(""" pekko.remote.protocol-name = "akka" """).withFallback(baseConfig) + + val configWithAkkaTcp: Config = + ConfigFactory.parseString(""" + pekko.remote.classic.netty.tcp.port = 0 + pekko.remote.protocol-name = "akka" + """).withFallback(baseConfig) } class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { @@ -71,6 +82,23 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { } } + "be allowed to join a cluster with a node using the pekko protocol (tcp)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekkoTcp) + + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaTcp) + clusterTestUtil.formCluster() + + try { + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + "allow a node using the pekko protocol to join the cluster" taggedAs LongRunningTest in { val clusterTestUtil = new ClusterTestUtil(system.name) @@ -88,5 +116,23 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { clusterTestUtil.shutdownAll() } } + + "allow a node using the pekko protocol to join the cluster (tcp)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkkaTcp) + + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoTcp) + clusterTestUtil.formCluster() + + try { + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } } } From 77c9df37f26144752bb5815afb485d64ec138911 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 28 Dec 2024 18:51:23 +0100 Subject: [PATCH 10/15] more tests --- .../cluster/MixedProtocolClusterSpec.scala | 88 ++++++++++++++++--- 1 file changed, 75 insertions(+), 13 deletions(-) diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala index b40719a55ba..2d9828b0acc 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -36,27 +36,54 @@ object MixedProtocolClusterSpec { pekko.cluster.configuration-compatibility-check.enforce-on-join = off """) - val configWithPekko: Config = + val configWithUdp: Config = ConfigFactory.parseString(""" - pekko.remote.protocol-name = "pekko" + pekko.remote.artery.transport = "aeron-udp" """).withFallback(baseConfig) - val configWithPekkoTcp: Config = + val configWithPekkoUdp: Config = ConfigFactory.parseString(""" - pekko.remote.classic.netty.tcp.port = 0 pekko.remote.protocol-name = "pekko" - """).withFallback(baseConfig) + """).withFallback(configWithUdp) - val configWithAkka: Config = + val configWithAkkaUdp: Config = ConfigFactory.parseString(""" pekko.remote.protocol-name = "akka" + """).withFallback(configWithUdp) + + val configWithTcp: Config = + ConfigFactory.parseString(""" + pekko.remote.artery.canonical.port = 0 """).withFallback(baseConfig) + val configWithPekkoTcp: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "pekko" + """).withFallback(configWithTcp) + val configWithAkkaTcp: Config = ConfigFactory.parseString(""" - pekko.remote.classic.netty.tcp.port = 0 pekko.remote.protocol-name = "akka" + """).withFallback(configWithTcp) + + val configWithNetty: Config = + ConfigFactory.parseString(""" + pekko.remote.artery.enabled = false + pekko.remote.classic { + enabled-transports = ["pekko.remote.classic.netty.tcp"] + netty.tcp.port = 0 + } """).withFallback(baseConfig) + + val configWithPekkoNetty: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "pekko" + """).withFallback(configWithNetty) + + val configWithAkkaNetty: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "akka" + """).withFallback(configWithNetty) } class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { @@ -65,14 +92,14 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { "A node using the akka protocol" must { - "be allowed to join a cluster with a node using the pekko protocol" taggedAs LongRunningTest in { + "be allowed to join a cluster with a node using the pekko protocol (udp)" taggedAs LongRunningTest in { val clusterTestUtil = new ClusterTestUtil(system.name) // start the first node with the "pekko" protocol - clusterTestUtil.newActorSystem(configWithPekko) + clusterTestUtil.newActorSystem(configWithPekkoUdp) // have a node using the "akka" protocol join - val joiningNode = clusterTestUtil.newActorSystem(configWithAkka) + val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaUdp) clusterTestUtil.formCluster() try { @@ -99,15 +126,32 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { } } - "allow a node using the pekko protocol to join the cluster" taggedAs LongRunningTest in { + "be allowed to join a cluster with a node using the pekko protocol (netty)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekkoNetty) + + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaNetty) + clusterTestUtil.formCluster() + + try { + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + + "allow a node using the pekko protocol to join the cluster (udp)" taggedAs LongRunningTest in { val clusterTestUtil = new ClusterTestUtil(system.name) // create the first node with the "akka" protocol - clusterTestUtil.newActorSystem(configWithAkka) + clusterTestUtil.newActorSystem(configWithAkkaUdp) // have a node using the "pekko" protocol join - val joiningNode = clusterTestUtil.newActorSystem(configWithPekko) + val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoUdp) clusterTestUtil.formCluster() try { @@ -134,5 +178,23 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { clusterTestUtil.shutdownAll() } } + + "allow a node using the pekko protocol to join the cluster (netty)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkkaNetty) + + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoNetty) + clusterTestUtil.formCluster() + + try { + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } } } From 516095e66c6b372950795a26141bf24c0c519674 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 28 Dec 2024 19:10:09 +0100 Subject: [PATCH 11/15] ignore udp tests --- .../org/apache/pekko/cluster/MixedProtocolClusterSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala index 2d9828b0acc..6bc98eafb8a 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -92,7 +92,7 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { "A node using the akka protocol" must { - "be allowed to join a cluster with a node using the pekko protocol (udp)" taggedAs LongRunningTest in { + "be allowed to join a cluster with a node using the pekko protocol (udp)" taggedAs LongRunningTest ignore { val clusterTestUtil = new ClusterTestUtil(system.name) // start the first node with the "pekko" protocol @@ -143,7 +143,7 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { } } - "allow a node using the pekko protocol to join the cluster (udp)" taggedAs LongRunningTest in { + "allow a node using the pekko protocol to join the cluster (udp)" taggedAs LongRunningTest ignore { val clusterTestUtil = new ClusterTestUtil(system.name) From 3fe06469988a52673a617516157c1e7cc1dcefe0 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 28 Dec 2024 19:32:24 +0100 Subject: [PATCH 12/15] try to make tests tidy up after failures --- .../cluster/MixedProtocolClusterSpec.scala | 75 +++++++++---------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala index 6bc98eafb8a..437777c9952 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -95,14 +95,14 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { "be allowed to join a cluster with a node using the pekko protocol (udp)" taggedAs LongRunningTest ignore { val clusterTestUtil = new ClusterTestUtil(system.name) - // start the first node with the "pekko" protocol - clusterTestUtil.newActorSystem(configWithPekkoUdp) + try { + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekkoUdp) - // have a node using the "akka" protocol join - val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaUdp) - clusterTestUtil.formCluster() + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaUdp) + clusterTestUtil.formCluster() - try { awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") } finally { clusterTestUtil.shutdownAll() @@ -112,14 +112,14 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { "be allowed to join a cluster with a node using the pekko protocol (tcp)" taggedAs LongRunningTest in { val clusterTestUtil = new ClusterTestUtil(system.name) - // start the first node with the "pekko" protocol - clusterTestUtil.newActorSystem(configWithPekkoTcp) + try { + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekkoTcp) - // have a node using the "akka" protocol join - val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaTcp) - clusterTestUtil.formCluster() + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaTcp) + clusterTestUtil.formCluster() - try { awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") } finally { clusterTestUtil.shutdownAll() @@ -129,14 +129,14 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { "be allowed to join a cluster with a node using the pekko protocol (netty)" taggedAs LongRunningTest in { val clusterTestUtil = new ClusterTestUtil(system.name) - // start the first node with the "pekko" protocol - clusterTestUtil.newActorSystem(configWithPekkoNetty) + try { + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekkoNetty) - // have a node using the "akka" protocol join - val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaNetty) - clusterTestUtil.formCluster() + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaNetty) + clusterTestUtil.formCluster() - try { awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") } finally { clusterTestUtil.shutdownAll() @@ -146,15 +146,14 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { "allow a node using the pekko protocol to join the cluster (udp)" taggedAs LongRunningTest ignore { val clusterTestUtil = new ClusterTestUtil(system.name) + try { + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkkaUdp) - // create the first node with the "akka" protocol - clusterTestUtil.newActorSystem(configWithAkkaUdp) - - // have a node using the "pekko" protocol join - val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoUdp) - clusterTestUtil.formCluster() + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoUdp) + clusterTestUtil.formCluster() - try { awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") } finally { clusterTestUtil.shutdownAll() @@ -164,15 +163,14 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { "allow a node using the pekko protocol to join the cluster (tcp)" taggedAs LongRunningTest in { val clusterTestUtil = new ClusterTestUtil(system.name) + try { + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkkaTcp) - // create the first node with the "akka" protocol - clusterTestUtil.newActorSystem(configWithAkkaTcp) - - // have a node using the "pekko" protocol join - val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoTcp) - clusterTestUtil.formCluster() + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoTcp) + clusterTestUtil.formCluster() - try { awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") } finally { clusterTestUtil.shutdownAll() @@ -182,15 +180,14 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { "allow a node using the pekko protocol to join the cluster (netty)" taggedAs LongRunningTest in { val clusterTestUtil = new ClusterTestUtil(system.name) + try { + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkkaNetty) - // create the first node with the "akka" protocol - clusterTestUtil.newActorSystem(configWithAkkaNetty) - - // have a node using the "pekko" protocol join - val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoNetty) - clusterTestUtil.formCluster() + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoNetty) + clusterTestUtil.formCluster() - try { awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") } finally { clusterTestUtil.shutdownAll() From a5d7ab6beb4ee0570cefb93bae6fd28341361734 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 28 Dec 2024 19:43:28 +0100 Subject: [PATCH 13/15] Update MixedProtocolClusterSpec.scala --- .../pekko/cluster/MixedProtocolClusterSpec.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala index 437777c9952..e876c80c3e8 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -29,6 +29,7 @@ object MixedProtocolClusterSpec { pekko.coordinated-shutdown.terminate-actor-system = on pekko.remote.artery.canonical.port = 0 + pekko.remote.classic.netty.tcp.port = 0 pekko.remote.artery.advanced.aeron.idle-cpu-level = 3 pekko.remote.accept-protocol-names = ["pekko", "akka"] @@ -51,27 +52,21 @@ object MixedProtocolClusterSpec { pekko.remote.protocol-name = "akka" """).withFallback(configWithUdp) - val configWithTcp: Config = - ConfigFactory.parseString(""" - pekko.remote.artery.canonical.port = 0 - """).withFallback(baseConfig) - val configWithPekkoTcp: Config = ConfigFactory.parseString(""" pekko.remote.protocol-name = "pekko" - """).withFallback(configWithTcp) + """).withFallback(baseConfig) val configWithAkkaTcp: Config = ConfigFactory.parseString(""" pekko.remote.protocol-name = "akka" - """).withFallback(configWithTcp) + """).withFallback(baseConfig) val configWithNetty: Config = ConfigFactory.parseString(""" pekko.remote.artery.enabled = false pekko.remote.classic { enabled-transports = ["pekko.remote.classic.netty.tcp"] - netty.tcp.port = 0 } """).withFallback(baseConfig) From bb117cf28fb0bde64f15ce63cb924e3d29b339fb Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 28 Dec 2024 20:07:04 +0100 Subject: [PATCH 14/15] Update MixedProtocolClusterSpec.scala --- .../org/apache/pekko/cluster/MixedProtocolClusterSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala index e876c80c3e8..f3c8c73adb9 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -87,7 +87,7 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { "A node using the akka protocol" must { - "be allowed to join a cluster with a node using the pekko protocol (udp)" taggedAs LongRunningTest ignore { + "be allowed to join a cluster with a node using the pekko protocol (udp)" taggedAs LongRunningTest in { val clusterTestUtil = new ClusterTestUtil(system.name) try { @@ -138,7 +138,7 @@ class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { } } - "allow a node using the pekko protocol to join the cluster (udp)" taggedAs LongRunningTest ignore { + "allow a node using the pekko protocol to join the cluster (udp)" taggedAs LongRunningTest in { val clusterTestUtil = new ClusterTestUtil(system.name) try { From 638ecf5f90a65a9af099488d7731a08dee27e166 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 30 Dec 2024 22:04:32 +0100 Subject: [PATCH 15/15] run main cluster tests for PR --- .github/workflows/build-test-prValidation.yml | 47 +++++++++++++++++++ .github/workflows/nightly-builds.yml | 1 - 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-test-prValidation.yml b/.github/workflows/build-test-prValidation.yml index fd8552257b6..b84938397a6 100644 --- a/.github/workflows/build-test-prValidation.yml +++ b/.github/workflows/build-test-prValidation.yml @@ -102,6 +102,53 @@ jobs: -Dio.netty.leakDetection.level=PARANOID \ validatePullRequest + pekko-classic-remoting-tests: + name: Pekko Classic Remoting Tests + runs-on: ubuntu-22.04 + if: github.repository == 'apache/pekko' + strategy: + fail-fast: false + matrix: + command: + - cluster/test distributed-data/test cluster-tools/test cluster-metrics/test + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + # we don't know what commit the last tag was it's safer to get entire repo so previousStableVersion resolves + fetch-depth: 0 + fetch-tags: true + + - name: Setup Java 11 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 11 + + - name: Install sbt + uses: sbt/setup-sbt@v1 + + - name: Cache Coursier cache + uses: coursier/cache-action@v6 + + - name: Enable jvm-opts + run: cp .jvmopts-ci .jvmopts + + - name: sbt ${{ matrix.command }} + env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + # note that this is not running any multi-jvm tests because multi-in-test=false + run: |- + sbt \ + -Djava.security.egd=file:/dev/./urandom \ + -Dpekko.remote.artery.enabled=off \ + -Dpekko.test.timefactor=2 \ + -Dpekko.actor.testkit.typed.timefactor=2 \ + -Dpekko.test.tags.exclude=gh-exclude,timing \ + -Dpekko.test.multi-in-test=false \ + -Dpekko.cluster.assert=on \ + clean ${{ matrix.command }} + jdk-21-extra-tests: name: Java 21 Extra Tests (including all tests that need Java 9+) runs-on: ubuntu-22.04 diff --git a/.github/workflows/nightly-builds.yml b/.github/workflows/nightly-builds.yml index c473d1d1d06..12cb31073c9 100644 --- a/.github/workflows/nightly-builds.yml +++ b/.github/workflows/nightly-builds.yml @@ -3,7 +3,6 @@ name: Nightly Builds on: schedule: - cron: "0 0 * * *" - pull_request: workflow_dispatch: permissions: {}