diff --git a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala index fbe8c5836a..c32ae51293 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala @@ -50,7 +50,7 @@ object Server { // Create necessary actors for server-server communications val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef)) - val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManagerRef, messageRegistry)) // Setup routes diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala index 2fa29c6fb2..e90e371995 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala @@ -59,6 +59,7 @@ final case class ConnectionMediator( // Tell monitor to stop scheduling heartbeats since there is no one to receive them if (serverMap.isEmpty) monitorRef ! Monitor.NoServerConnected + gossipManagerRef ! Monitor.NoServerConnected case ConnectionMediator.ReadPeersClientAddress() => if (serverMap.isEmpty) @@ -69,6 +70,7 @@ final case class ConnectionMediator( case ConnectionMediator.NewServerConnected(serverRef, greetServer) => if (serverMap.isEmpty) { monitorRef ! Monitor.AtLeastOneServerConnected + gossipManagerRef ! Monitor.AtLeastOneServerConnected } serverMap += ((serverRef, greetServer)) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index 7269be0d11..001feb34c6 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -1,9 +1,11 @@ package ch.epfl.pop.decentralized import akka.NotUsed -import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers} import akka.pattern.{AskableActorRef, ask} import akka.stream.scaladsl.Flow +import ch.epfl.pop.decentralized.GossipManager.TriggerPullState +import ch.epfl.pop.model.network.MethodType.rumor_state import ch.epfl.pop.model.network.method.Rumor import ch.epfl.pop.model.network.method.message.Message import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType} @@ -16,6 +18,7 @@ import ch.epfl.pop.storage.DbActor import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorGetRumorStateAck, DbActorReadRumorData, GetRumorState} import scala.concurrent.Await +import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.util.Random /** This class is responsible of managing the gossiping of rumors across the network @@ -24,13 +27,16 @@ import scala.util.Random * @param stopProbability * probability with which we stop the gossipping in case of error response */ -final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5) extends Actor with AskPatternConstants with ActorLogging { +final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5, pullRate: FiniteDuration = 5.seconds) extends Actor with AskPatternConstants with ActorLogging with Timers { private var activeGossipProtocol: Map[JsonRpcRequest, Set[ActorRef]] = Map.empty + private var rumorMap: Map[PublicKey, Int] = Map.empty private var jsonId = 0 private var publicKey: Option[PublicKey] = None private var connectionMediatorRef: AskableActorRef = _ + private val periodicRumorStateKey = 0 + publicKey = { val readPk = dbActorRef ? DbActor.ReadServerPublicKey() Await.result(readPk, duration) match @@ -40,6 +46,19 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou None } + rumorMap = + publicKey match + case Some(pk: PublicKey) => + val readRumorData = dbActorRef ? DbActor.ReadRumorData(pk) + Await.result(readRumorData, duration) match + case DbActorReadRumorData(foundRumorIds: RumorData) => rumorMap.updated(pk, foundRumorIds.lastRumorId()) + case failure => Map.empty + case None => Map.empty + + /** Does a step of gossipping protocol for given rpc. Tries to find a random peer that hasn't already received this msg If such a peer is found, sends message and updates table accordingly. If no peer is found, ends the protocol. + * @param rumorRpc + * Rpc that must be spreac + */ /** Does a step of gossipping protocol for given rpc. Tries to find a random peer that hasn't already received this msg If such a peer is found, sends message and updates table accordingly. If no peer is found, ends the protocol. * @param rumorRpc * Rpc that must be spread @@ -129,6 +148,28 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou case None => -1 } + private def sendRumorState(): Unit = { + val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer() + Await.result(randomPeer, duration) match { + case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) => + val rumorStateGet = dbActorRef ? GetRumorState() + Await.result(rumorStateGet, duration) match + case DbActorGetRumorStateAck(rumorState) => + serverRef ! ClientAnswer( + Right(JsonRpcRequest( + RpcValidator.JSON_RPC_VERSION, + rumor_state, + rumorState, + Some(jsonId) + )) + ) + jsonId += 1 + case _ => log.info(s"Actor $self failed on creating rumor state") + case _ => + log.info(s"Actor $self received an unexpected message waiting for a random peer") + } + } + private def peersAlreadyReceived(jsonRpcRequest: JsonRpcRequest): Set[ActorRef] = { val activeGossip = activeGossipProtocol.get(jsonRpcRequest) activeGossip match @@ -161,6 +202,15 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou log.info(s"Actor $self received a ping from Connection Mediator") connectionMediatorRef = sender() + case Monitor.AtLeastOneServerConnected => + timers.startTimerWithFixedDelay(periodicRumorStateKey, TriggerPullState(), pullRate) + + case Monitor.NoServerConnected => + timers.cancel(periodicRumorStateKey) + + case TriggerPullState() => + sendRumorState() + case _ => log.info(s"Actor $self received an unexpected message") } @@ -168,8 +218,8 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou } object GossipManager extends AskPatternConstants { - def props(dbActorRef: AskableActorRef, monitorRef: ActorRef): Props = - Props(new GossipManager(dbActorRef)) + def props(dbActorRef: AskableActorRef, pullRate: FiniteDuration = 15.seconds): Props = + Props(new GossipManager(dbActorRef, pullRate = pullRate)) /** When receiving a rumor, gossip manager handles the rumor by relaying * @@ -225,4 +275,9 @@ object GossipManager extends AskPatternConstants { final case class HandleRumor(jsonRpcRequest: JsonRpcRequest, clientActorRef: ActorRef) final case class ManageGossipResponse(jsonRpcResponse: JsonRpcResponse) final case class StartGossip(messages: Map[Channel, List[Message]]) + final case class TriggerPullState() + + sealed trait GossipManagerMessage + final case class Ping() extends GossipManagerMessage + } diff --git a/be2-scala/src/main/scala/ch/epfl/pop/json/HighLevelProtocol.scala b/be2-scala/src/main/scala/ch/epfl/pop/json/HighLevelProtocol.scala index 7fab4aff33..178f76000f 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/json/HighLevelProtocol.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/json/HighLevelProtocol.scala @@ -263,7 +263,7 @@ object HighLevelProtocol extends DefaultJsonProtocol { // We don't differentiate and use an EmptyList to make result available to different response handler if (resultArray.isEmpty) new ResultObject(ResultEmptyList()) - resultArray.head.asJsObject.fields.keySet match + else resultArray.head.asJsObject.fields.keySet match case keys if keys == RumorFormat.fields => new ResultObject(ResultRumor(resultArray.map(_.convertTo[Rumor]).toList)) case keys if keys == messageFormat.fields => diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index 4e94090ed1..f6d06a2e80 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -141,7 +141,7 @@ object PublishSubscribe { val gossipMonitorPartition = builder.add(GossipManager.monitorResponse(gossipManager)) val getMsgByIdResponsePartition = builder.add(ProcessMessagesHandler.getMsgByIdResponseHandler(messageRegistry)) - val rumorStateAnsPartition = builder.add(ProcessMessagesHandler.rumorStateAnsHandler(messageRegistry)) + val rumorStateAnsPartition = builder.add(ProcessMessagesHandler.rumorStateAnsHandler(dbActorRef, messageRegistry)) /* glue the components together */ input ~> responsePartitioner diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala index 34ee42857d..80dbe97908 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala @@ -2,6 +2,7 @@ package ch.epfl.pop.pubsub.graph.handlers import akka.NotUsed import akka.actor.{ActorRef, ActorSystem} +import akka.pattern.AskableActorRef import akka.stream.scaladsl.{Flow, Sink, Source} import ch.epfl.pop.model.network.method.{Publish, Rumor} import ch.epfl.pop.model.network.method.message.Message @@ -10,6 +11,7 @@ import ch.epfl.pop.model.objects.Channel import ch.epfl.pop.pubsub.graph.validators.RpcValidator import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError, prettyPrinter} import ch.epfl.pop.pubsub.{AskPatternConstants, MessageRegistry, PublishSubscribe} +import ch.epfl.pop.storage.DbActor.{DbActorAck, WriteRumor} import scala.annotation.tailrec import scala.util.Success @@ -49,7 +51,7 @@ object ProcessMessagesHandler extends AskPatternConstants { case value @ _ => value } - def rumorStateAnsHandler(messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map { + def rumorStateAnsHandler(dbActorRef: AskableActorRef, messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map { case msg @ Right(JsonRpcResponse(_, Some(resultObject), None, jsonId)) => resultObject.resultRumor match case Some(rumorList) => @@ -57,8 +59,10 @@ object ProcessMessagesHandler extends AskPatternConstants { .flatMap(_.messages) .groupBy(_._1) .view.mapValues(_.flatMap(_._2).toSet).toMap - processMsgMap(mergedMsg, messageRegistry) - msg + if (processMsgMap(mergedMsg, messageRegistry) && writeRumorsInDb(dbActorRef, rumorList)) + msg + else + Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"Rumor state handler was not able to process all rumors from $msg", jsonId)) case _ => Left(PipelineError( ErrorCodes.SERVER_ERROR.id, s"Rumor state handler received an unexpected type of result $msg", @@ -71,6 +75,16 @@ object ProcessMessagesHandler extends AskPatternConstants { )) } + private def writeRumorsInDb(dbActorRef: AskableActorRef, rumors: List[Rumor]): Boolean = { + rumors.foreach { rumor => + val writeRumor = dbActorRef ? WriteRumor(rumor) + Await.result(writeRumor, duration) match + case DbActorAck() => /* DO NOTHING*/ + case _ => false + } + true + } + def rumorHandler(messageRegistry: MessageRegistry, rumor: Rumor)(implicit system: ActorSystem): Boolean = { val msgMap = rumor.messages.map((chan, list) => (chan, list.toSet)) processMsgMap(msgMap, messageRegistry) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala b/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala index 9241bc9933..555bdc2130 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala @@ -6,10 +6,10 @@ import akka.pattern.AskableActorRef import ch.epfl.pop.decentralized.ConnectionMediator import ch.epfl.pop.json.MessageDataProtocol import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat -import ch.epfl.pop.model.network.method.{Rumor, RumorState} import ch.epfl.pop.model.network.method.message.Message import ch.epfl.pop.model.network.method.message.data.lao.GreetLao import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType} +import ch.epfl.pop.model.network.method.{Rumor, RumorState} import ch.epfl.pop.model.objects.* import ch.epfl.pop.model.objects.Channel.{LAO_DATA_LOCATION, ROOT_CHANNEL_PREFIX} import ch.epfl.pop.pubsub.graph.AnswerGenerator.timout @@ -20,7 +20,6 @@ import com.google.crypto.tink.subtle.Ed25519Sign import java.util.concurrent.TimeUnit import scala.collection.immutable.HashMap -import scala.collection.mutable.ListBuffer import scala.concurrent.Await import scala.concurrent.duration.{Duration, FiniteDuration} import scala.util.{Failure, Success, Try} diff --git a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala index 9e0e61f8f1..c7915e8da8 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala @@ -14,15 +14,16 @@ import akka.stream.scaladsl.{Flow, Sink, Source} import ch.epfl.pop.IOHelper.readJsonFromPath import ch.epfl.pop.model.network.MethodType.rumor import ch.epfl.pop.model.network.{ErrorObject, JsonRpcRequest, JsonRpcResponse, MethodType, ResultObject} -import ch.epfl.pop.model.network.method.{GreetServer, Rumor} +import ch.epfl.pop.model.network.method.{GreetServer, Rumor, RumorState} import ch.epfl.pop.model.objects.{Base64Data, PublicKey, RumorData} import ch.epfl.pop.pubsub.ClientActor.ClientAnswer import ch.epfl.pop.pubsub.graph.GraphMessage import ch.epfl.pop.pubsub.graph.validators.RpcValidator -import ch.epfl.pop.storage.DbActor.DbActorReadRumorData +import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorReadRumorData} import org.scalatest.BeforeAndAfterEach import scala.concurrent.Await +import concurrent.duration.DurationInt class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSystem")) with AnyFunSuiteLike with AskPatternConstants with Matchers with BeforeAndAfterEach { @@ -63,7 +64,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys val rumor: Rumor = rumorRequest.getParams.asInstanceOf[Rumor] test("When receiving a message, gossip manager should create and send a rumor") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe("a") val gossip = GossipManager.startGossip(gossipManager, sender.ref) @@ -98,7 +99,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("Gossip manager increments jsonRpcId and rumorID when starting a gossip from message") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe("b") val gossip = GossipManager.startGossip(gossipManager, sender.ref) @@ -127,7 +128,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("Gossip manager should increment jsonRpcId but not rumor when starting gossip from rumor") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe("c") val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref) @@ -157,7 +158,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("Gossip should stop when there is no peers left") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) val sender = TestProbe("d") val gossip = GossipManager.startGossip(gossipManager, sender.ref) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) @@ -194,7 +195,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("Gossip should write in memory new rumors sent") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) val sender = TestProbe() val gossip = GossipManager.startGossip(gossipManager, sender.ref) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) @@ -227,7 +228,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys test("gossip handler should forward a rumor to a random server") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe() val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref) @@ -251,7 +252,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("gossip handler should send to only one server if multiples are present") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe() val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref) @@ -278,7 +279,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } test("gossip handler should send rumor if there is an ongoing gossip protocol") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe() val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref) @@ -338,4 +339,23 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys } + test("Gossip sends rumor state when there is one server connected") { + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, pullRate = 2.seconds)) + connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) + val server = TestProbe() + + val writeRumor = dbActorRef ? DbActor.WriteRumor(rumor) + Await.result(writeRumor, duration) shouldBe DbActorAck() + + connectionMediatorRef ? ConnectionMediator.NewServerConnected(server.ref, GreetServer(PublicKey(Base64Data.encode("publickey")), "client", "server")) + checkPeersWritten(connectionMediatorRef) + + server.receiveOne(5.seconds) match + case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) => + jsonRpcRequest.id shouldBe Some(0) + jsonRpcRequest.method shouldBe MethodType.rumor_state + val rumorState = jsonRpcRequest.getParams.asInstanceOf[RumorState] + rumorState.state shouldBe Map(rumor.senderPk -> rumor.rumorId) + } + } diff --git a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorHandlerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorHandlerSuite.scala index ca9fb93ed3..8ec58e1133 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorHandlerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorHandlerSuite.scala @@ -38,7 +38,7 @@ class RumorHandlerSuite extends TestKitBase with AnyFunSuiteLike with AskPattern private val dbActorRef: AskableActorRef = system.actorOf(Props(DbActor(pubSubMediatorRef, messageRegistry, inMemoryStorage)), "dbRumor") private val securityModuleActorRef: AskableActorRef = system.actorOf(Props(SecurityModuleActor(RuntimeEnvironment.securityPath)), "securityRumor") private val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef), "monitorRumor") - private val gossipRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef)) + private val gossipRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) private var connectionMediatorRef: AskableActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipRef, messageRegistry), "connMediatorRumor") private val rumorHandler: Flow[GraphMessage, GraphMessage, NotUsed] = ParamsHandler.rumorHandler(dbActorRef, messageRegistry) diff --git a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala index 6020c30806..ecf083e858 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala @@ -16,8 +16,9 @@ import akka.pattern.ask import ch.epfl.pop.model.network.MethodType.publish import ch.epfl.pop.model.network.method.message.Message import ch.epfl.pop.model.network.method.{Publish, Rumor, RumorState} -import ch.epfl.pop.model.objects.Channel +import ch.epfl.pop.model.objects.{Channel, RumorData} import ch.epfl.pop.pubsub.graph.validators.RpcValidator +import ch.epfl.pop.storage.DbActor.{DbActorReadRumorData, ReadRumorData} import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers.{a, equal, should, shouldBe} @@ -37,7 +38,7 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle messageRegistry = MessageRegistry() pubSubMediatorRef = system.actorOf(PubSubMediator.props, "pubSubRumorState") dbActorRef = system.actorOf(Props(DbActor(pubSubMediatorRef, messageRegistry, inMemoryStorage)), "dbRumorStateAns") - rumorStateAnsHandler = ProcessMessagesHandler.rumorStateAnsHandler(messageRegistry) + rumorStateAnsHandler = ProcessMessagesHandler.rumorStateAnsHandler(dbActorRef, messageRegistry) PublishSubscribe.buildGraph(pubSubMediatorRef, dbActorRef, ActorRef.noSender, messageRegistry, ActorRef.noSender, ActorRef.noSender, ActorRef.noSender, false) } @@ -82,6 +83,12 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle val messagesInRumor = rumor.messages.values.foldLeft(Set.empty: Set[Message])((acc, set) => acc ++ set) messagesInRumor.diff(messagesInDb) should equal(Set.empty) + + val readRumorData = dbActorRef ? ReadRumorData(rumor.senderPk) + Await.result(readRumorData, duration) match + case DbActorReadRumorData(rumorData: RumorData) => + rumorData.rumorIds shouldBe rumorList.map(_.rumorId) + case _ => 0 shouldBe 1 } test("rumor state ans handler fails on wrong type") {