From f17e729368ee5b8e5c7f1d510044244d24b3ab3e Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Mon, 3 Jun 2024 18:48:13 +0200 Subject: [PATCH 01/11] added rumor periodic creation --- .../decentralized/ConnectionMediator.scala | 2 + .../pop/decentralized/GossipManager.scala | 44 +++++++++++++++++-- .../scala/ch/epfl/pop/storage/DbActor.scala | 17 ++++++- 3 files changed, 57 insertions(+), 6 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala index aed6e3ba76..4c62efab8c 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala @@ -58,6 +58,7 @@ final case class ConnectionMediator( // Tell monitor to stop scheduling heartbeats since there is no one to receive them if (serverMap.isEmpty) monitorRef ! Monitor.NoServerConnected + gossipManagerRef ? Monitor.NoServerConnected case ConnectionMediator.ReadPeersClientAddress() => if (serverMap.isEmpty) @@ -68,6 +69,7 @@ final case class ConnectionMediator( case ConnectionMediator.NewServerConnected(serverRef, greetServer) => if (serverMap.isEmpty) { monitorRef ! Monitor.AtLeastOneServerConnected + gossipManagerRef ? Monitor.AtLeastOneServerConnected } serverMap += ((serverRef, greetServer)) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index 7de99f2ba5..8cb78fab76 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -1,9 +1,11 @@ package ch.epfl.pop.decentralized import akka.NotUsed -import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers} import akka.pattern.AskableActorRef import akka.stream.scaladsl.Flow +import ch.epfl.pop.decentralized.GossipManager.TriggerPullState +import ch.epfl.pop.model.network.MethodType.rumor_state import ch.epfl.pop.model.network.method.message.Message import ch.epfl.pop.model.network.method.{GreetServer, Rumor} import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType} @@ -13,24 +15,29 @@ import ch.epfl.pop.pubsub.ClientActor.ClientAnswer import ch.epfl.pop.pubsub.graph.validators.RpcValidator import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError} import ch.epfl.pop.storage.DbActor -import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorReadRumorData} +import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorGetRumorStateAck, DbActorReadRumorData, GetRumorState} import scala.concurrent.Await +import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.util.Random final case class GossipManager( dbActorRef: AskableActorRef, monitorRef: ActorRef, connectionMediator: AskableActorRef, - stopProbability: Double = 0.5 -) extends Actor with AskPatternConstants with ActorLogging { + stopProbability: Double = 0.5, + pullRate: FiniteDuration = 15.seconds +) extends Actor with AskPatternConstants with ActorLogging with Timers { private type ServerInfos = (ActorRef, GreetServer) private var activeGossipProtocol: Map[JsonRpcRequest, List[ServerInfos]] = Map.empty private var jsonId = 0 private var rumorId = 0 + private var stateId = 0 private var publicKey: Option[PublicKey] = None + private val periodicRumorStateKey = 0 + publicKey = { val readPk = dbActorRef ? DbActor.ReadServerPublicKey() Await.result(readPk, duration) match @@ -135,6 +142,26 @@ final case class GossipManager( log.info(s"Actor (gossip) $self will not be able to start rumors because it has no publicKey") } + private def sendRumorState(): Unit = { + val randomPeer = connectionMediator ? ConnectionMediator.GetRandomPeer() + Await.result(randomPeer, duration) match { + case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) => + val rumorStateGet = dbActorRef ? GetRumorState + Await.result(rumorStateGet, duration) match + case DbActorGetRumorStateAck(rumorState) => + serverRef ! ClientAnswer( + Right(JsonRpcRequest( + RpcValidator.JSON_RPC_VERSION, + rumor_state, + rumorState, + Some(stateId) + )) + ) + case _ => + log.info(s"Actor $self received an unexpected message waiting for a random peer") + } + } + override def receive: Receive = { case GossipManager.HandleRumor(jsonRpcRequest: JsonRpcRequest) => handleRumor(jsonRpcRequest) @@ -145,6 +172,14 @@ final case class GossipManager( case GossipManager.StartGossip(messages) => startGossip(messages) + case Monitor.AtLeastOneServerConnected => + timers.startTimerWithFixedDelay(periodicRumorStateKey, TriggerPullState, pullRate) + + case Monitor.NoServerConnected => + timers.cancel(periodicRumorStateKey) + + case TriggerPullState => + sendRumorState() case _ => log.info(s"Actor $self received an unexpected message") } @@ -188,6 +223,7 @@ object GossipManager extends AskPatternConstants { final case class HandleRumor(jsonRpcRequest: JsonRpcRequest) final case class ManageGossipResponse(jsonRpcResponse: JsonRpcResponse) final case class StartGossip(messages: Map[Channel, List[Message]]) + final case class TriggerPullState() sealed trait GossipManagerMessage final case class Ping() extends GossipManagerMessage diff --git a/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala b/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala index 9e4198dc4d..555bdc2130 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala @@ -6,10 +6,10 @@ import akka.pattern.AskableActorRef import ch.epfl.pop.decentralized.ConnectionMediator import ch.epfl.pop.json.MessageDataProtocol import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat -import ch.epfl.pop.model.network.method.{Rumor, RumorState} import ch.epfl.pop.model.network.method.message.Message import ch.epfl.pop.model.network.method.message.data.lao.GreetLao import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType} +import ch.epfl.pop.model.network.method.{Rumor, RumorState} import ch.epfl.pop.model.objects.* import ch.epfl.pop.model.objects.Channel.{LAO_DATA_LOCATION, ROOT_CHANNEL_PREFIX} import ch.epfl.pop.pubsub.graph.AnswerGenerator.timout @@ -20,7 +20,6 @@ import com.google.crypto.tink.subtle.Ed25519Sign import java.util.concurrent.TimeUnit import scala.collection.immutable.HashMap -import scala.collection.mutable.ListBuffer import scala.concurrent.Await import scala.concurrent.duration.{Duration, FiniteDuration} import scala.util.{Failure, Success, Try} @@ -655,6 +654,12 @@ final case class DbActor( case failure => sender() ! failure.recover(Status.Failure(_)) } + case GetRumorState() => + log.info(s"Actor $self (db) received a GetRumorState request") + Try(getRumorState) match + case Success(rumorState) => sender() ! DbActorGetRumorStateAck(rumorState) + case failure => sender() ! failure.recover(Status.Failure(_)) + case m => log.info(s"Actor $self (db) received an unknown message") sender() ! Status.Failure(DbActorNAckException(ErrorCodes.INVALID_ACTION.id, s"database actor received a message '$m' that it could not recognize")) @@ -898,6 +903,10 @@ object DbActor { final case class GenerateRumorStateAns(rumorState: RumorState) extends Event + /** Requests the db to build out rumorState + + */ + final case class GetRumorState() extends Event + // DbActor DbActorMessage correspond to messages the actor may emit sealed trait DbActorMessage @@ -991,6 +1000,10 @@ object DbActor { */ final case class DbActorGenerateRumorStateAns(rumorList: List[Rumor]) extends DbActorMessage + /** Response for a [[GetRumorState]] + + */ + final case class DbActorGetRumorStateAck(rumorState: RumorState) extends DbActorMessage + /** Response for a general db actor ACK */ final case class DbActorAck() extends DbActorMessage From 1795be9c339974b327802179d7f61ed6ff1931e3 Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Mon, 3 Jun 2024 19:19:44 +0200 Subject: [PATCH 02/11] remove stateId --- .../main/scala/ch/epfl/pop/decentralized/GossipManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index 8cb78fab76..bbdcec83e7 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -33,7 +33,6 @@ final case class GossipManager( private var activeGossipProtocol: Map[JsonRpcRequest, List[ServerInfos]] = Map.empty private var jsonId = 0 private var rumorId = 0 - private var stateId = 0 private var publicKey: Option[PublicKey] = None private val periodicRumorStateKey = 0 @@ -154,9 +153,10 @@ final case class GossipManager( RpcValidator.JSON_RPC_VERSION, rumor_state, rumorState, - Some(stateId) + Some(jsonId) )) ) + jsonId += 1 case _ => log.info(s"Actor $self received an unexpected message waiting for a random peer") } From e3693e06d6251b569a4f705add2181a2ea710df9 Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Tue, 4 Jun 2024 10:33:39 +0200 Subject: [PATCH 03/11] added test --- .../epfl/pop/decentralized/GossipManagerSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala index ff48bb9a83..e17bfb5095 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala @@ -294,4 +294,15 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys case _ => 0 shouldBe 1 } + test("Gossip sends rumor state when there is one server connected") { + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef, connectionMediatorRef)) + val watcher = TestProbe() + val server = TestProbe() + + connectionMediatorRef ? ConnectionMediator.NewServerConnected(server.ref, GreetServer(PublicKey(Base64Data.encode("publickey")), "client", "server")) + + server.receiveOne(duration) shouldBe a[Right[Nothing, JsonRpcRequest]] + + } + } From 322fef9668d43b4681dc2954fe70583c8066d6d8 Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Tue, 4 Jun 2024 11:56:25 +0200 Subject: [PATCH 04/11] small fixes and added test --- .../src/main/scala/ch/epfl/pop/Server.scala | 2 +- .../pop/decentralized/GossipManager.scala | 16 +++----- .../ch/epfl/pop/pubsub/PublishSubscribe.scala | 2 - .../decentralized/GossipManagerSuite.scala | 40 ++++++++++++------- .../graph/handlers/RumorHandlerSuite.scala | 2 +- 5 files changed, 33 insertions(+), 29 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala index fbe8c5836a..c32ae51293 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala @@ -50,7 +50,7 @@ object Server { // Create necessary actors for server-server communications val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef)) - val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManagerRef, messageRegistry)) // Setup routes diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index 4a8e00984d..e9da4f98ba 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -30,9 +30,7 @@ import scala.util.Random * @param stopProbability * probability with which we stop the gossipping in case of error response */ -final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5, - pullRate: FiniteDuration = 15.seconds -) extends Actor with AskPatternConstants with ActorLogging with Timers { +final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5, pullRate: FiniteDuration = 15.seconds) extends Actor with AskPatternConstants with ActorLogging with Timers { private var activeGossipProtocol: Map[JsonRpcRequest, Set[ActorRef]] = Map.empty private var rumorMap: Map[PublicKey, Int] = Map.empty @@ -148,10 +146,10 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou } private def sendRumorState(): Unit = { - val randomPeer = connectionMediator ? ConnectionMediator.GetRandomPeer() + val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer() Await.result(randomPeer, duration) match { case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) => - val rumorStateGet = dbActorRef ? GetRumorState + val rumorStateGet = dbActorRef ? GetRumorState() Await.result(rumorStateGet, duration) match case DbActorGetRumorStateAck(rumorState) => serverRef ! ClientAnswer( @@ -226,8 +224,8 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou } object GossipManager extends AskPatternConstants { - def props(dbActorRef: AskableActorRef, monitorRef: ActorRef): Props = - Props(new GossipManager(dbActorRef)) + def props(dbActorRef: AskableActorRef, pullRate: FiniteDuration = 15.seconds): Props = + Props(new GossipManager(dbActorRef, pullRate = pullRate)) /** When receiving a rumor, gossip manager handles the rumor by relaying * @@ -250,11 +248,9 @@ object GossipManager extends AskPatternConstants { /** Monitors responses to check if one is related to a rumor we sent * @param gossipManager * reference to the gossip manager of the server - * @param clientActorRef - * reference to the client who sent the message. * @return */ - def monitorResponse(gossipManager: AskableActorRef, clientActorRef: ActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map { + def monitorResponse(gossipManager: AskableActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map { case Right(jsonRpcResponse: JsonRpcResponse) => gossipManager ? ManageGossipResponse(jsonRpcResponse) Right(jsonRpcResponse) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index 86e42933c2..57a1b79307 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -75,8 +75,6 @@ object PublishSubscribe { val requestPartition = builder.add(validateRequests(clientActorRef, messageRegistry)) val responsePartitionGraph = builder.add(responsePartition(messageRegistry, gossipManager)) - val gossipMonitorPartition = builder.add(GossipManager.monitorResponse(gossipManager, clientActorRef)) - val getMsgByIdResponsePartition = builder.add(ProcessMessagesHandler.getMsgByIdResponseHandler(messageRegistry)) // ResponseHandler messages do not go in the merger val merger = builder.add(Merge[GraphMessage](totalPorts - 1)) diff --git a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala index ab652b4293..a5a3711f1b 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala @@ -14,15 +14,16 @@ import akka.stream.scaladsl.{Flow, Sink, Source} import ch.epfl.pop.IOHelper.readJsonFromPath import ch.epfl.pop.model.network.MethodType.rumor import ch.epfl.pop.model.network.{ErrorObject, JsonRpcRequest, JsonRpcResponse, MethodType, ResultObject} -import ch.epfl.pop.model.network.method.{GreetServer, Rumor} +import ch.epfl.pop.model.network.method.{GreetServer, Rumor, RumorState} import ch.epfl.pop.model.objects.{Base64Data, PublicKey, RumorData} import ch.epfl.pop.pubsub.ClientActor.ClientAnswer import ch.epfl.pop.pubsub.graph.GraphMessage import ch.epfl.pop.pubsub.graph.validators.RpcValidator -import ch.epfl.pop.storage.DbActor.DbActorReadRumorData +import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorReadRumorData} import org.scalatest.BeforeAndAfterEach import scala.concurrent.Await +import concurrent.duration.DurationInt class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSystem")) with AnyFunSuiteLike with AskPatternConstants with Matchers with BeforeAndAfterEach { @@ -63,7 +64,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys val rumor: Rumor = rumorRequest.getParams.asInstanceOf[Rumor] test("When receiving a message, gossip manager should create and send a rumor") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe("a") val gossip = GossipManager.startGossip(gossipManager, sender.ref) @@ -98,7 +99,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("Gossip manager increments jsonRpcId and rumorID when starting a gossip from message") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe("b") val gossip = GossipManager.startGossip(gossipManager, sender.ref) @@ -127,7 +128,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("Gossip manager should increment jsonRpcId but not rumor when starting gossip from rumor") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe("c") val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref) @@ -157,13 +158,13 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("Gossip should stop when there is no peers left") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) val sender = TestProbe("d") val gossip = GossipManager.startGossip(gossipManager, sender.ref) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val peerServer = TestProbe() - val gossipMonitor = GossipManager.monitorResponse(gossipManager, peerServer.ref) + val gossipMonitor = GossipManager.monitorResponse(gossipManager) // registers a new server connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data.encode("publicKey")), "", "")) @@ -194,7 +195,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("Gossip should write in memory new rumors sent") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) val sender = TestProbe() val gossip = GossipManager.startGossip(gossipManager, sender.ref) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) @@ -227,7 +228,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys test("gossip handler should forward a rumor to a random server") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe() val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref) @@ -251,7 +252,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("gossip handler should send to only one server if multiples are present") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe() val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref) @@ -278,12 +279,12 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("gossip handler should send rumor if there is an ongoing gossip protocol") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe() val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref) - val gossipMonitor = GossipManager.monitorResponse(gossipManager, sender.ref) + val gossipMonitor = GossipManager.monitorResponse(gossipManager) val peerServer1 = TestProbe() val peerServer2 = TestProbe() @@ -339,14 +340,23 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("Gossip sends rumor state when there is one server connected") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef, connectionMediatorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, pullRate = 2.seconds)) + connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val watcher = TestProbe() val server = TestProbe() - connectionMediatorRef ? ConnectionMediator.NewServerConnected(server.ref, GreetServer(PublicKey(Base64Data.encode("publickey")), "client", "server")) + val writeRumor = dbActorRef ? DbActor.WriteRumor(rumor) + Await.result(writeRumor, duration) shouldBe DbActorAck() - server.receiveOne(duration) shouldBe a[Right[Nothing, JsonRpcRequest]] + connectionMediatorRef ? ConnectionMediator.NewServerConnected(server.ref, GreetServer(PublicKey(Base64Data.encode("publickey")), "client", "server")) + checkPeersWritten(connectionMediatorRef) + server.receiveOne(5.seconds) match + case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) => + jsonRpcRequest.id shouldBe Some(0) + jsonRpcRequest.method shouldBe MethodType.rumor_state + val rumorState = jsonRpcRequest.getParams.asInstanceOf[RumorState] + rumorState.state shouldBe Map(rumor.senderPk -> rumor.rumorId) } } diff --git a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorHandlerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorHandlerSuite.scala index 3265a59a34..23598fb527 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorHandlerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorHandlerSuite.scala @@ -38,7 +38,7 @@ class RumorHandlerSuite extends TestKitBase with AnyFunSuiteLike with AskPattern private val dbActorRef: AskableActorRef = system.actorOf(Props(DbActor(pubSubMediatorRef, messageRegistry, inMemoryStorage)), "dbRumor") private val securityModuleActorRef: AskableActorRef = system.actorOf(Props(SecurityModuleActor(RuntimeEnvironment.securityPath)), "securityRumor") private val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef), "monitorRumor") - private val gossipRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + private val gossipRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) private var connectionMediatorRef: AskableActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipRef, messageRegistry), "connMediatorRumor") private val rumorHandler: Flow[GraphMessage, GraphMessage, NotUsed] = ParamsHandler.rumorHandler(dbActorRef, messageRegistry) From b70bbc93062a8343dd97c36ea82a7b70276ff78b Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Fri, 14 Jun 2024 14:52:24 +0200 Subject: [PATCH 05/11] adapted changes --- .../ch/epfl/pop/decentralized/GossipManager.scala | 10 ++++++++-- .../ch/epfl/pop/decentralized/GossipManagerSuite.scala | 1 - 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index d8fb55c119..528d3152b7 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -147,7 +147,7 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou case Some(rumorIdInDb) => rumorIdInDb case None => -1 } - + private def sendRumorState(): Unit = { val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer() Await.result(randomPeer, duration) match { @@ -211,7 +211,13 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou log.info(s"Actor $self received a ping from Connection Mediator") connectionMediatorRef = sender() - case TriggerPullState => + case Monitor.AtLeastOneServerConnected => + timers.startTimerWithFixedDelay(periodicRumorStateKey, TriggerPullState(), pullRate) + + case Monitor.NoServerConnected => + timers.cancel(periodicRumorStateKey) + + case TriggerPullState() => sendRumorState() case _ => diff --git a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala index a5a3711f1b..c7915e8da8 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala @@ -342,7 +342,6 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys test("Gossip sends rumor state when there is one server connected") { val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, pullRate = 2.seconds)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) - val watcher = TestProbe() val server = TestProbe() val writeRumor = dbActorRef ? DbActor.WriteRumor(rumor) From c4859c318e3822d7ced908838a75a51466bcb23f Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Fri, 14 Jun 2024 14:59:39 +0200 Subject: [PATCH 06/11] removed unused code --- .../ch/epfl/pop/decentralized/GossipManager.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index 528d3152b7..a973d82d3b 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -187,16 +187,6 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou request } - private def isNextRumor(publicKey: PublicKey, rumorId: Int): Boolean = { - rumorMap.get(publicKey) match - case None => rumorId == 0 - case Some(localRumorId) => localRumorId == rumorId - 1 - } - - private def incrementMap(publicKey: PublicKey): Unit = { - rumorMap = rumorMap.updated(publicKey, rumorMap.getOrElse(publicKey, -1) + 1) - } - override def receive: Receive = { case GossipManager.HandleRumor(jsonRpcRequest: JsonRpcRequest, clientActorRef: ActorRef) => handleRumor(jsonRpcRequest, clientActorRef) From 18de354b71351b5c490639ab84eceff757347e38 Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Mon, 17 Jun 2024 12:52:23 +0200 Subject: [PATCH 07/11] addressed comments --- .../scala/ch/epfl/pop/decentralized/ConnectionMediator.scala | 4 ++-- .../main/scala/ch/epfl/pop/decentralized/GossipManager.scala | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala index 4d09cf7c04..e90e371995 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala @@ -59,7 +59,7 @@ final case class ConnectionMediator( // Tell monitor to stop scheduling heartbeats since there is no one to receive them if (serverMap.isEmpty) monitorRef ! Monitor.NoServerConnected - gossipManagerRef ? Monitor.NoServerConnected + gossipManagerRef ! Monitor.NoServerConnected case ConnectionMediator.ReadPeersClientAddress() => if (serverMap.isEmpty) @@ -70,7 +70,7 @@ final case class ConnectionMediator( case ConnectionMediator.NewServerConnected(serverRef, greetServer) => if (serverMap.isEmpty) { monitorRef ! Monitor.AtLeastOneServerConnected - gossipManagerRef ? Monitor.AtLeastOneServerConnected + gossipManagerRef ! Monitor.AtLeastOneServerConnected } serverMap += ((serverRef, greetServer)) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index a973d82d3b..a1564d9d3b 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -164,6 +164,7 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou )) ) jsonId += 1 + case _ => log.info(s"Actor $self failed on creating rumor state") case _ => log.info(s"Actor $self received an unexpected message waiting for a random peer") } From 49f79ae8f4f776acf5687d15d77347251e732269 Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Mon, 17 Jun 2024 13:40:02 +0200 Subject: [PATCH 08/11] small fix --- .../src/main/scala/ch/epfl/pop/json/HighLevelProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/json/HighLevelProtocol.scala b/be2-scala/src/main/scala/ch/epfl/pop/json/HighLevelProtocol.scala index 790a0f6664..94a913ba85 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/json/HighLevelProtocol.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/json/HighLevelProtocol.scala @@ -257,7 +257,7 @@ object HighLevelProtocol extends DefaultJsonProtocol { // We don't differentiate and use an EmptyList to make result available to different response handler if (resultArray.isEmpty) new ResultObject(ResultEmptyList()) - resultArray.head.asJsObject.fields.keySet match + else resultArray.head.asJsObject.fields.keySet match case keys if keys == RumorFormat.fields => new ResultObject(ResultRumor(resultArray.map(_.convertTo[Rumor]).toList)) case keys if keys == messageFormat.fields => From f384dac90c32d8e6990009e33fb9f4b3e8ed0a8a Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Wed, 19 Jun 2024 12:16:27 +0200 Subject: [PATCH 09/11] small fix for writting processed rumor in memory --- .../pop/decentralized/GossipManager.scala | 6 +++--- .../ch/epfl/pop/pubsub/PublishSubscribe.scala | 2 +- .../handlers/ProcessMessagesHandler.scala | 20 ++++++++++++++++--- .../handlers/RumorStateAnsHandlerSuite.scala | 11 ++++++++-- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index a1564d9d3b..1ce74a4fe0 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -27,7 +27,7 @@ import scala.util.Random * @param stopProbability * probability with which we stop the gossipping in case of error response */ -final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5, pullRate: FiniteDuration = 15.seconds) extends Actor with AskPatternConstants with ActorLogging with Timers { +final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5, pullRate: FiniteDuration = 5.seconds) extends Actor with AskPatternConstants with ActorLogging with Timers { private var activeGossipProtocol: Map[JsonRpcRequest, Set[ActorRef]] = Map.empty private var rumorMap: Map[PublicKey, Int] = Map.empty @@ -75,9 +75,9 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou val alreadySent: Set[ActorRef] = activeGossip + serverRef activeGossipProtocol += (rumorRpc -> alreadySent) log.info(s"rumorSent > dest : ${greetServer.clientAddress}, rumor : $rumorRpc") - serverRef ! ClientAnswer( + /*serverRef ! ClientAnswer( Right(rumorRpc) - ) + )*/ // else remove entry case ConnectionMediator.NoPeer() => activeGossipProtocol = activeGossipProtocol.removed(rumorRpc) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index 4e94090ed1..f6d06a2e80 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -141,7 +141,7 @@ object PublishSubscribe { val gossipMonitorPartition = builder.add(GossipManager.monitorResponse(gossipManager)) val getMsgByIdResponsePartition = builder.add(ProcessMessagesHandler.getMsgByIdResponseHandler(messageRegistry)) - val rumorStateAnsPartition = builder.add(ProcessMessagesHandler.rumorStateAnsHandler(messageRegistry)) + val rumorStateAnsPartition = builder.add(ProcessMessagesHandler.rumorStateAnsHandler(dbActorRef, messageRegistry)) /* glue the components together */ input ~> responsePartitioner diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala index 34ee42857d..80dbe97908 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala @@ -2,6 +2,7 @@ package ch.epfl.pop.pubsub.graph.handlers import akka.NotUsed import akka.actor.{ActorRef, ActorSystem} +import akka.pattern.AskableActorRef import akka.stream.scaladsl.{Flow, Sink, Source} import ch.epfl.pop.model.network.method.{Publish, Rumor} import ch.epfl.pop.model.network.method.message.Message @@ -10,6 +11,7 @@ import ch.epfl.pop.model.objects.Channel import ch.epfl.pop.pubsub.graph.validators.RpcValidator import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError, prettyPrinter} import ch.epfl.pop.pubsub.{AskPatternConstants, MessageRegistry, PublishSubscribe} +import ch.epfl.pop.storage.DbActor.{DbActorAck, WriteRumor} import scala.annotation.tailrec import scala.util.Success @@ -49,7 +51,7 @@ object ProcessMessagesHandler extends AskPatternConstants { case value @ _ => value } - def rumorStateAnsHandler(messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map { + def rumorStateAnsHandler(dbActorRef: AskableActorRef, messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map { case msg @ Right(JsonRpcResponse(_, Some(resultObject), None, jsonId)) => resultObject.resultRumor match case Some(rumorList) => @@ -57,8 +59,10 @@ object ProcessMessagesHandler extends AskPatternConstants { .flatMap(_.messages) .groupBy(_._1) .view.mapValues(_.flatMap(_._2).toSet).toMap - processMsgMap(mergedMsg, messageRegistry) - msg + if (processMsgMap(mergedMsg, messageRegistry) && writeRumorsInDb(dbActorRef, rumorList)) + msg + else + Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"Rumor state handler was not able to process all rumors from $msg", jsonId)) case _ => Left(PipelineError( ErrorCodes.SERVER_ERROR.id, s"Rumor state handler received an unexpected type of result $msg", @@ -71,6 +75,16 @@ object ProcessMessagesHandler extends AskPatternConstants { )) } + private def writeRumorsInDb(dbActorRef: AskableActorRef, rumors: List[Rumor]): Boolean = { + rumors.foreach { rumor => + val writeRumor = dbActorRef ? WriteRumor(rumor) + Await.result(writeRumor, duration) match + case DbActorAck() => /* DO NOTHING*/ + case _ => false + } + true + } + def rumorHandler(messageRegistry: MessageRegistry, rumor: Rumor)(implicit system: ActorSystem): Boolean = { val msgMap = rumor.messages.map((chan, list) => (chan, list.toSet)) processMsgMap(msgMap, messageRegistry) diff --git a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala index 6020c30806..ecf083e858 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala @@ -16,8 +16,9 @@ import akka.pattern.ask import ch.epfl.pop.model.network.MethodType.publish import ch.epfl.pop.model.network.method.message.Message import ch.epfl.pop.model.network.method.{Publish, Rumor, RumorState} -import ch.epfl.pop.model.objects.Channel +import ch.epfl.pop.model.objects.{Channel, RumorData} import ch.epfl.pop.pubsub.graph.validators.RpcValidator +import ch.epfl.pop.storage.DbActor.{DbActorReadRumorData, ReadRumorData} import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers.{a, equal, should, shouldBe} @@ -37,7 +38,7 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle messageRegistry = MessageRegistry() pubSubMediatorRef = system.actorOf(PubSubMediator.props, "pubSubRumorState") dbActorRef = system.actorOf(Props(DbActor(pubSubMediatorRef, messageRegistry, inMemoryStorage)), "dbRumorStateAns") - rumorStateAnsHandler = ProcessMessagesHandler.rumorStateAnsHandler(messageRegistry) + rumorStateAnsHandler = ProcessMessagesHandler.rumorStateAnsHandler(dbActorRef, messageRegistry) PublishSubscribe.buildGraph(pubSubMediatorRef, dbActorRef, ActorRef.noSender, messageRegistry, ActorRef.noSender, ActorRef.noSender, ActorRef.noSender, false) } @@ -82,6 +83,12 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle val messagesInRumor = rumor.messages.values.foldLeft(Set.empty: Set[Message])((acc, set) => acc ++ set) messagesInRumor.diff(messagesInDb) should equal(Set.empty) + + val readRumorData = dbActorRef ? ReadRumorData(rumor.senderPk) + Await.result(readRumorData, duration) match + case DbActorReadRumorData(rumorData: RumorData) => + rumorData.rumorIds shouldBe rumorList.map(_.rumorId) + case _ => 0 shouldBe 1 } test("rumor state ans handler fails on wrong type") { From fe511566f08491225a0b68514faf8c4705176a44 Mon Sep 17 00:00:00 2001 From: DanielTavaresA Date: Wed, 19 Jun 2024 10:16:57 +0000 Subject: [PATCH 10/11] auto-format action fixes --- docs/protocol.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/protocol.md b/docs/protocol.md index 18d810adec..58dd05e966 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -986,7 +986,7 @@ RPC "sender_id": "J9fBzJV70Jk5c-i3277Uq4CmeL4t53WDfUghaK0HpeM=", "rumor_id": 1, "timestamp" : { - "J9fBzJV70Jk5c-i3277Uq4CmeL4t53WDfUghaK0HpeM=": 3, + "J9fBzJV70Jk5c-i3277Uq4CmeL4t53WDfUghaK0HpeM=": 1, "RZOPi59Iy5gkpS2mkpfQJNl44HKc2jVbF0iTGm0RvfU=": 5, "CfG2ByLhtLJH--T2BL9hZ6eGm11tpkE-5KuvysSCY0I=": 1, "r8cG9HyJ1FGBke_5IblCdH19mvy39MvLFSArVmY3FpY=": 10 @@ -1142,7 +1142,7 @@ Response in case of success "sender_id": "J9fBzJV70Jk5c-i3277Uq4CmeL4t53WDfUghaK0HpeM=", "rumor_id": 1, "timestamp" : { - "J9fBzJV70Jk5c-i3277Uq4CmeL4t53WDfUghaK0HpeM=": 3, + "J9fBzJV70Jk5c-i3277Uq4CmeL4t53WDfUghaK0HpeM=": 1, "RZOPi59Iy5gkpS2mkpfQJNl44HKc2jVbF0iTGm0RvfU=": 5, "CfG2ByLhtLJH--T2BL9hZ6eGm11tpkE-5KuvysSCY0I=": 1, "r8cG9HyJ1FGBke_5IblCdH19mvy39MvLFSArVmY3FpY=": 9 @@ -1163,7 +1163,7 @@ Response in case of success "sender_id": "J9fBzJV70Jk5c-i3277Uq4CmeL4t53WDfUghaK0HpeM=", "rumor_id": 2, "timestamp" : { - "J9fBzJV70Jk5c-i3277Uq4CmeL4t53WDfUghaK0HpeM=": 3, + "J9fBzJV70Jk5c-i3277Uq4CmeL4t53WDfUghaK0HpeM=": 2, "RZOPi59Iy5gkpS2mkpfQJNl44HKc2jVbF0iTGm0RvfU=": 5, "CfG2ByLhtLJH--T2BL9hZ6eGm11tpkE-5KuvysSCY0I=": 1, "r8cG9HyJ1FGBke_5IblCdH19mvy39MvLFSArVmY3FpY=": 10 From 536e4bef3a006ee379376f878201947a7b0b7251 Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Thu, 20 Jun 2024 15:34:06 +0200 Subject: [PATCH 11/11] reverted sending of push --- .../main/scala/ch/epfl/pop/decentralized/GossipManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index 1ce74a4fe0..001feb34c6 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -75,9 +75,9 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou val alreadySent: Set[ActorRef] = activeGossip + serverRef activeGossipProtocol += (rumorRpc -> alreadySent) log.info(s"rumorSent > dest : ${greetServer.clientAddress}, rumor : $rumorRpc") - /*serverRef ! ClientAnswer( + serverRef ! ClientAnswer( Right(rumorRpc) - )*/ + ) // else remove entry case ConnectionMediator.NoPeer() => activeGossipProtocol = activeGossipProtocol.removed(rumorRpc)