Skip to content

Commit

Permalink
Merge pull request #1905 from dedis/work-be2-daniel-rumor-state-creation
Browse files Browse the repository at this point in the history
[BE2] Add rumor state creation
  • Loading branch information
K1li4nL authored Jun 24, 2024
2 parents c309fa7 + f4edf82 commit 5432866
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 25 deletions.
2 changes: 1 addition & 1 deletion be2-scala/src/main/scala/ch/epfl/pop/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object Server {

// Create necessary actors for server-server communications
val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef))
val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManagerRef, messageRegistry))

// Setup routes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ final case class ConnectionMediator(
// Tell monitor to stop scheduling heartbeats since there is no one to receive them
if (serverMap.isEmpty)
monitorRef ! Monitor.NoServerConnected
gossipManagerRef ! Monitor.NoServerConnected

case ConnectionMediator.ReadPeersClientAddress() =>
if (serverMap.isEmpty)
Expand All @@ -69,6 +70,7 @@ final case class ConnectionMediator(
case ConnectionMediator.NewServerConnected(serverRef, greetServer) =>
if (serverMap.isEmpty) {
monitorRef ! Monitor.AtLeastOneServerConnected
gossipManagerRef ! Monitor.AtLeastOneServerConnected
}
serverMap += ((serverRef, greetServer))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package ch.epfl.pop.decentralized

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
import akka.pattern.{AskableActorRef, ask}
import akka.stream.scaladsl.Flow
import ch.epfl.pop.decentralized.GossipManager.TriggerPullState
import ch.epfl.pop.model.network.MethodType.rumor_state
import ch.epfl.pop.model.network.method.Rumor
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType}
Expand All @@ -16,6 +18,7 @@ import ch.epfl.pop.storage.DbActor
import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorGetRumorStateAck, DbActorReadRumorData, GetRumorState}

import scala.concurrent.Await
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

/** This class is responsible of managing the gossiping of rumors across the network
Expand All @@ -24,13 +27,16 @@ import scala.util.Random
* @param stopProbability
* probability with which we stop the gossipping in case of error response
*/
final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5) extends Actor with AskPatternConstants with ActorLogging {
final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5, pullRate: FiniteDuration = 5.seconds) extends Actor with AskPatternConstants with ActorLogging with Timers {

private var activeGossipProtocol: Map[JsonRpcRequest, Set[ActorRef]] = Map.empty
private var rumorMap: Map[PublicKey, Int] = Map.empty
private var jsonId = 0
private var publicKey: Option[PublicKey] = None
private var connectionMediatorRef: AskableActorRef = _

private val periodicRumorStateKey = 0

publicKey = {
val readPk = dbActorRef ? DbActor.ReadServerPublicKey()
Await.result(readPk, duration) match
Expand All @@ -40,6 +46,19 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
None
}

rumorMap =
publicKey match
case Some(pk: PublicKey) =>
val readRumorData = dbActorRef ? DbActor.ReadRumorData(pk)
Await.result(readRumorData, duration) match
case DbActorReadRumorData(foundRumorIds: RumorData) => rumorMap.updated(pk, foundRumorIds.lastRumorId())
case failure => Map.empty
case None => Map.empty

/** Does a step of gossipping protocol for given rpc. Tries to find a random peer that hasn't already received this msg If such a peer is found, sends message and updates table accordingly. If no peer is found, ends the protocol.
* @param rumorRpc
* Rpc that must be spreac
*/
/** Does a step of gossipping protocol for given rpc. Tries to find a random peer that hasn't already received this msg If such a peer is found, sends message and updates table accordingly. If no peer is found, ends the protocol.
* @param rumorRpc
* Rpc that must be spread
Expand Down Expand Up @@ -129,6 +148,28 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
case None => -1
}

private def sendRumorState(): Unit = {
val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer()
Await.result(randomPeer, duration) match {
case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) =>
val rumorStateGet = dbActorRef ? GetRumorState()
Await.result(rumorStateGet, duration) match
case DbActorGetRumorStateAck(rumorState) =>
serverRef ! ClientAnswer(
Right(JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
rumor_state,
rumorState,
Some(jsonId)
))
)
jsonId += 1
case _ => log.info(s"Actor $self failed on creating rumor state")
case _ =>
log.info(s"Actor $self received an unexpected message waiting for a random peer")
}
}

private def peersAlreadyReceived(jsonRpcRequest: JsonRpcRequest): Set[ActorRef] = {
val activeGossip = activeGossipProtocol.get(jsonRpcRequest)
activeGossip match
Expand Down Expand Up @@ -161,15 +202,24 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
log.info(s"Actor $self received a ping from Connection Mediator")
connectionMediatorRef = sender()

case Monitor.AtLeastOneServerConnected =>
timers.startTimerWithFixedDelay(periodicRumorStateKey, TriggerPullState(), pullRate)

case Monitor.NoServerConnected =>
timers.cancel(periodicRumorStateKey)

case TriggerPullState() =>
sendRumorState()

case _ =>
log.info(s"Actor $self received an unexpected message")
}

}

object GossipManager extends AskPatternConstants {
def props(dbActorRef: AskableActorRef, monitorRef: ActorRef): Props =
Props(new GossipManager(dbActorRef))
def props(dbActorRef: AskableActorRef, pullRate: FiniteDuration = 15.seconds): Props =
Props(new GossipManager(dbActorRef, pullRate = pullRate))

/** When receiving a rumor, gossip manager handles the rumor by relaying
*
Expand Down Expand Up @@ -225,4 +275,9 @@ object GossipManager extends AskPatternConstants {
final case class HandleRumor(jsonRpcRequest: JsonRpcRequest, clientActorRef: ActorRef)
final case class ManageGossipResponse(jsonRpcResponse: JsonRpcResponse)
final case class StartGossip(messages: Map[Channel, List[Message]])
final case class TriggerPullState()

sealed trait GossipManagerMessage
final case class Ping() extends GossipManagerMessage

}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ object HighLevelProtocol extends DefaultJsonProtocol {
// We don't differentiate and use an EmptyList to make result available to different response handler
if (resultArray.isEmpty)
new ResultObject(ResultEmptyList())
resultArray.head.asJsObject.fields.keySet match
else resultArray.head.asJsObject.fields.keySet match
case keys if keys == RumorFormat.fields =>
new ResultObject(ResultRumor(resultArray.map(_.convertTo[Rumor]).toList))
case keys if keys == messageFormat.fields =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object PublishSubscribe {

val gossipMonitorPartition = builder.add(GossipManager.monitorResponse(gossipManager))
val getMsgByIdResponsePartition = builder.add(ProcessMessagesHandler.getMsgByIdResponseHandler(messageRegistry))
val rumorStateAnsPartition = builder.add(ProcessMessagesHandler.rumorStateAnsHandler(messageRegistry))
val rumorStateAnsPartition = builder.add(ProcessMessagesHandler.rumorStateAnsHandler(dbActorRef, messageRegistry))

/* glue the components together */
input ~> responsePartitioner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ch.epfl.pop.pubsub.graph.handlers

import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.AskableActorRef
import akka.stream.scaladsl.{Flow, Sink, Source}
import ch.epfl.pop.model.network.method.{Publish, Rumor}
import ch.epfl.pop.model.network.method.message.Message
Expand All @@ -10,6 +11,7 @@ import ch.epfl.pop.model.objects.Channel
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError, prettyPrinter}
import ch.epfl.pop.pubsub.{AskPatternConstants, MessageRegistry, PublishSubscribe}
import ch.epfl.pop.storage.DbActor.{DbActorAck, WriteRumor}

import scala.annotation.tailrec
import scala.util.Success
Expand Down Expand Up @@ -49,16 +51,18 @@ object ProcessMessagesHandler extends AskPatternConstants {
case value @ _ => value
}

def rumorStateAnsHandler(messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
def rumorStateAnsHandler(dbActorRef: AskableActorRef, messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case msg @ Right(JsonRpcResponse(_, Some(resultObject), None, jsonId)) =>
resultObject.resultRumor match
case Some(rumorList) =>
val mergedMsg = rumorList
.flatMap(_.messages)
.groupBy(_._1)
.view.mapValues(_.flatMap(_._2).toSet).toMap
processMsgMap(mergedMsg, messageRegistry)
msg
if (processMsgMap(mergedMsg, messageRegistry) && writeRumorsInDb(dbActorRef, rumorList))
msg
else
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"Rumor state handler was not able to process all rumors from $msg", jsonId))
case _ => Left(PipelineError(
ErrorCodes.SERVER_ERROR.id,
s"Rumor state handler received an unexpected type of result $msg",
Expand All @@ -71,6 +75,16 @@ object ProcessMessagesHandler extends AskPatternConstants {
))
}

private def writeRumorsInDb(dbActorRef: AskableActorRef, rumors: List[Rumor]): Boolean = {
rumors.foreach { rumor =>
val writeRumor = dbActorRef ? WriteRumor(rumor)
Await.result(writeRumor, duration) match
case DbActorAck() => /* DO NOTHING*/
case _ => false
}
true
}

def rumorHandler(messageRegistry: MessageRegistry, rumor: Rumor)(implicit system: ActorSystem): Boolean = {
val msgMap = rumor.messages.map((chan, list) => (chan, list.toSet))
processMsgMap(msgMap, messageRegistry)
Expand Down
3 changes: 1 addition & 2 deletions be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import akka.pattern.AskableActorRef
import ch.epfl.pop.decentralized.ConnectionMediator
import ch.epfl.pop.json.MessageDataProtocol
import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat
import ch.epfl.pop.model.network.method.{Rumor, RumorState}
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.message.data.lao.GreetLao
import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType}
import ch.epfl.pop.model.network.method.{Rumor, RumorState}
import ch.epfl.pop.model.objects.*
import ch.epfl.pop.model.objects.Channel.{LAO_DATA_LOCATION, ROOT_CHANNEL_PREFIX}
import ch.epfl.pop.pubsub.graph.AnswerGenerator.timout
Expand All @@ -20,7 +20,6 @@ import com.google.crypto.tink.subtle.Ed25519Sign

import java.util.concurrent.TimeUnit
import scala.collection.immutable.HashMap
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.{Failure, Success, Try}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ import akka.stream.scaladsl.{Flow, Sink, Source}
import ch.epfl.pop.IOHelper.readJsonFromPath
import ch.epfl.pop.model.network.MethodType.rumor
import ch.epfl.pop.model.network.{ErrorObject, JsonRpcRequest, JsonRpcResponse, MethodType, ResultObject}
import ch.epfl.pop.model.network.method.{GreetServer, Rumor}
import ch.epfl.pop.model.network.method.{GreetServer, Rumor, RumorState}
import ch.epfl.pop.model.objects.{Base64Data, PublicKey, RumorData}
import ch.epfl.pop.pubsub.ClientActor.ClientAnswer
import ch.epfl.pop.pubsub.graph.GraphMessage
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.storage.DbActor.DbActorReadRumorData
import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorReadRumorData}
import org.scalatest.BeforeAndAfterEach

import scala.concurrent.Await
import concurrent.duration.DurationInt

class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSystem")) with AnyFunSuiteLike with AskPatternConstants with Matchers with BeforeAndAfterEach {

Expand Down Expand Up @@ -63,7 +64,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
val rumor: Rumor = rumorRequest.getParams.asInstanceOf[Rumor]

test("When receiving a message, gossip manager should create and send a rumor") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe("a")
val gossip = GossipManager.startGossip(gossipManager, sender.ref)
Expand Down Expand Up @@ -98,7 +99,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("Gossip manager increments jsonRpcId and rumorID when starting a gossip from message") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe("b")
val gossip = GossipManager.startGossip(gossipManager, sender.ref)
Expand Down Expand Up @@ -127,7 +128,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("Gossip manager should increment jsonRpcId but not rumor when starting gossip from rumor") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe("c")
val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref)
Expand Down Expand Up @@ -157,7 +158,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("Gossip should stop when there is no peers left") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
val sender = TestProbe("d")
val gossip = GossipManager.startGossip(gossipManager, sender.ref)
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
Expand Down Expand Up @@ -194,7 +195,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("Gossip should write in memory new rumors sent") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
val sender = TestProbe()
val gossip = GossipManager.startGossip(gossipManager, sender.ref)
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
Expand Down Expand Up @@ -227,7 +228,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys

test("gossip handler should forward a rumor to a random server") {

val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe()
val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref)
Expand All @@ -251,7 +252,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("gossip handler should send to only one server if multiples are present") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe()
val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref)
Expand All @@ -278,7 +279,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("gossip handler should send rumor if there is an ongoing gossip protocol") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe()
val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref)
Expand Down Expand Up @@ -338,4 +339,23 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys

}

test("Gossip sends rumor state when there is one server connected") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, pullRate = 2.seconds))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val server = TestProbe()

val writeRumor = dbActorRef ? DbActor.WriteRumor(rumor)
Await.result(writeRumor, duration) shouldBe DbActorAck()

connectionMediatorRef ? ConnectionMediator.NewServerConnected(server.ref, GreetServer(PublicKey(Base64Data.encode("publickey")), "client", "server"))
checkPeersWritten(connectionMediatorRef)

server.receiveOne(5.seconds) match
case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) =>
jsonRpcRequest.id shouldBe Some(0)
jsonRpcRequest.method shouldBe MethodType.rumor_state
val rumorState = jsonRpcRequest.getParams.asInstanceOf[RumorState]
rumorState.state shouldBe Map(rumor.senderPk -> rumor.rumorId)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RumorHandlerSuite extends TestKitBase with AnyFunSuiteLike with AskPattern
private val dbActorRef: AskableActorRef = system.actorOf(Props(DbActor(pubSubMediatorRef, messageRegistry, inMemoryStorage)), "dbRumor")
private val securityModuleActorRef: AskableActorRef = system.actorOf(Props(SecurityModuleActor(RuntimeEnvironment.securityPath)), "securityRumor")
private val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef), "monitorRumor")
private val gossipRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
private val gossipRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
private var connectionMediatorRef: AskableActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipRef, messageRegistry), "connMediatorRumor")
private val rumorHandler: Flow[GraphMessage, GraphMessage, NotUsed] = ParamsHandler.rumorHandler(dbActorRef, messageRegistry)

Expand Down
Loading

0 comments on commit 5432866

Please sign in to comment.