Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BE2] Add rumor state creation #1905

Merged
merged 17 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be2-scala/src/main/scala/ch/epfl/pop/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object Server {

// Create necessary actors for server-server communications
val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef))
val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManagerRef, messageRegistry))

// Setup routes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ final case class ConnectionMediator(
// Tell monitor to stop scheduling heartbeats since there is no one to receive them
if (serverMap.isEmpty)
monitorRef ! Monitor.NoServerConnected
gossipManagerRef ? Monitor.NoServerConnected

K1li4nL marked this conversation as resolved.
Show resolved Hide resolved
case ConnectionMediator.ReadPeersClientAddress() =>
if (serverMap.isEmpty)
Expand All @@ -69,6 +70,7 @@ final case class ConnectionMediator(
case ConnectionMediator.NewServerConnected(serverRef, greetServer) =>
if (serverMap.isEmpty) {
monitorRef ! Monitor.AtLeastOneServerConnected
gossipManagerRef ? Monitor.AtLeastOneServerConnected
}
K1li4nL marked this conversation as resolved.
Show resolved Hide resolved
serverMap += ((serverRef, greetServer))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package ch.epfl.pop.decentralized

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
import akka.pattern.{AskableActorRef, ask}
import akka.stream.scaladsl.Flow
import ch.epfl.pop.decentralized.GossipManager.TriggerPullState
import ch.epfl.pop.model.network.MethodType.rumor_state
import ch.epfl.pop.model.network.method.Rumor
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType}
Expand All @@ -16,6 +18,7 @@ import ch.epfl.pop.storage.DbActor
import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorGetRumorStateAck, DbActorReadRumorData, GetRumorState}

import scala.concurrent.Await
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

/** This class is responsible of managing the gossiping of rumors across the network
Expand All @@ -24,13 +27,16 @@ import scala.util.Random
* @param stopProbability
* probability with which we stop the gossipping in case of error response
*/
final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5) extends Actor with AskPatternConstants with ActorLogging {
final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5, pullRate: FiniteDuration = 15.seconds) extends Actor with AskPatternConstants with ActorLogging with Timers {

private var activeGossipProtocol: Map[JsonRpcRequest, Set[ActorRef]] = Map.empty
private var rumorMap: Map[PublicKey, Int] = Map.empty
private var jsonId = 0
private var publicKey: Option[PublicKey] = None
private var connectionMediatorRef: AskableActorRef = _

private val periodicRumorStateKey = 0

publicKey = {
val readPk = dbActorRef ? DbActor.ReadServerPublicKey()
Await.result(readPk, duration) match
Expand All @@ -40,6 +46,19 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
None
}

rumorMap =
publicKey match
case Some(pk: PublicKey) =>
val readRumorData = dbActorRef ? DbActor.ReadRumorData(pk)
Await.result(readRumorData, duration) match
case DbActorReadRumorData(foundRumorIds: RumorData) => rumorMap.updated(pk, foundRumorIds.lastRumorId())
case failure => Map.empty
case None => Map.empty

/** Does a step of gossipping protocol for given rpc. Tries to find a random peer that hasn't already received this msg If such a peer is found, sends message and updates table accordingly. If no peer is found, ends the protocol.
* @param rumorRpc
* Rpc that must be spreac
*/
/** Does a step of gossipping protocol for given rpc. Tries to find a random peer that hasn't already received this msg If such a peer is found, sends message and updates table accordingly. If no peer is found, ends the protocol.
* @param rumorRpc
* Rpc that must be spread
Expand Down Expand Up @@ -129,6 +148,27 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
case None => -1
}

private def sendRumorState(): Unit = {
val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer()
Await.result(randomPeer, duration) match {
case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) =>
val rumorStateGet = dbActorRef ? GetRumorState()
Await.result(rumorStateGet, duration) match
case DbActorGetRumorStateAck(rumorState) =>
serverRef ! ClientAnswer(
Right(JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
rumor_state,
rumorState,
Some(jsonId)
))
)
jsonId += 1
K1li4nL marked this conversation as resolved.
Show resolved Hide resolved
case _ =>
log.info(s"Actor $self received an unexpected message waiting for a random peer")
}
}

private def peersAlreadyReceived(jsonRpcRequest: JsonRpcRequest): Set[ActorRef] = {
val activeGossip = activeGossipProtocol.get(jsonRpcRequest)
activeGossip match
Expand Down Expand Up @@ -161,15 +201,24 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
log.info(s"Actor $self received a ping from Connection Mediator")
connectionMediatorRef = sender()

case Monitor.AtLeastOneServerConnected =>
timers.startTimerWithFixedDelay(periodicRumorStateKey, TriggerPullState(), pullRate)

case Monitor.NoServerConnected =>
timers.cancel(periodicRumorStateKey)

case TriggerPullState() =>
sendRumorState()

case _ =>
log.info(s"Actor $self received an unexpected message")
}

}

object GossipManager extends AskPatternConstants {
def props(dbActorRef: AskableActorRef, monitorRef: ActorRef): Props =
Props(new GossipManager(dbActorRef))
def props(dbActorRef: AskableActorRef, pullRate: FiniteDuration = 15.seconds): Props =
Props(new GossipManager(dbActorRef, pullRate = pullRate))

/** When receiving a rumor, gossip manager handles the rumor by relaying
*
Expand Down Expand Up @@ -225,4 +274,9 @@ object GossipManager extends AskPatternConstants {
final case class HandleRumor(jsonRpcRequest: JsonRpcRequest, clientActorRef: ActorRef)
final case class ManageGossipResponse(jsonRpcResponse: JsonRpcResponse)
final case class StartGossip(messages: Map[Channel, List[Message]])
final case class TriggerPullState()

sealed trait GossipManagerMessage
final case class Ping() extends GossipManagerMessage

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import akka.pattern.AskableActorRef
import ch.epfl.pop.decentralized.ConnectionMediator
import ch.epfl.pop.json.MessageDataProtocol
import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat
import ch.epfl.pop.model.network.method.{Rumor, RumorState}
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.message.data.lao.GreetLao
import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType}
import ch.epfl.pop.model.network.method.{Rumor, RumorState}
import ch.epfl.pop.model.objects.*
import ch.epfl.pop.model.objects.Channel.{LAO_DATA_LOCATION, ROOT_CHANNEL_PREFIX}
import ch.epfl.pop.pubsub.graph.AnswerGenerator.timout
Expand All @@ -20,7 +20,6 @@ import com.google.crypto.tink.subtle.Ed25519Sign

import java.util.concurrent.TimeUnit
import scala.collection.immutable.HashMap
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.{Failure, Success, Try}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ import akka.stream.scaladsl.{Flow, Sink, Source}
import ch.epfl.pop.IOHelper.readJsonFromPath
import ch.epfl.pop.model.network.MethodType.rumor
import ch.epfl.pop.model.network.{ErrorObject, JsonRpcRequest, JsonRpcResponse, MethodType, ResultObject}
import ch.epfl.pop.model.network.method.{GreetServer, Rumor}
import ch.epfl.pop.model.network.method.{GreetServer, Rumor, RumorState}
import ch.epfl.pop.model.objects.{Base64Data, PublicKey, RumorData}
import ch.epfl.pop.pubsub.ClientActor.ClientAnswer
import ch.epfl.pop.pubsub.graph.GraphMessage
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.storage.DbActor.DbActorReadRumorData
import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorReadRumorData}
import org.scalatest.BeforeAndAfterEach

import scala.concurrent.Await
import concurrent.duration.DurationInt

class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSystem")) with AnyFunSuiteLike with AskPatternConstants with Matchers with BeforeAndAfterEach {

Expand Down Expand Up @@ -63,7 +64,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
val rumor: Rumor = rumorRequest.getParams.asInstanceOf[Rumor]

test("When receiving a message, gossip manager should create and send a rumor") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe("a")
val gossip = GossipManager.startGossip(gossipManager, sender.ref)
Expand Down Expand Up @@ -98,7 +99,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("Gossip manager increments jsonRpcId and rumorID when starting a gossip from message") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe("b")
val gossip = GossipManager.startGossip(gossipManager, sender.ref)
Expand Down Expand Up @@ -127,7 +128,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("Gossip manager should increment jsonRpcId but not rumor when starting gossip from rumor") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe("c")
val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref)
Expand Down Expand Up @@ -157,7 +158,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("Gossip should stop when there is no peers left") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
val sender = TestProbe("d")
val gossip = GossipManager.startGossip(gossipManager, sender.ref)
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
Expand Down Expand Up @@ -194,7 +195,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("Gossip should write in memory new rumors sent") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
val sender = TestProbe()
val gossip = GossipManager.startGossip(gossipManager, sender.ref)
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
Expand Down Expand Up @@ -227,7 +228,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys

test("gossip handler should forward a rumor to a random server") {

val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe()
val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref)
Expand All @@ -251,7 +252,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("gossip handler should send to only one server if multiples are present") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe()
val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref)
Expand All @@ -278,7 +279,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
}

test("gossip handler should send rumor if there is an ongoing gossip protocol") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe()
val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref)
Expand Down Expand Up @@ -338,4 +339,23 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys

}

test("Gossip sends rumor state when there is one server connected") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, pullRate = 2.seconds))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val server = TestProbe()

val writeRumor = dbActorRef ? DbActor.WriteRumor(rumor)
Await.result(writeRumor, duration) shouldBe DbActorAck()

connectionMediatorRef ? ConnectionMediator.NewServerConnected(server.ref, GreetServer(PublicKey(Base64Data.encode("publickey")), "client", "server"))
checkPeersWritten(connectionMediatorRef)

server.receiveOne(5.seconds) match
case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) =>
jsonRpcRequest.id shouldBe Some(0)
jsonRpcRequest.method shouldBe MethodType.rumor_state
val rumorState = jsonRpcRequest.getParams.asInstanceOf[RumorState]
rumorState.state shouldBe Map(rumor.senderPk -> rumor.rumorId)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RumorHandlerSuite extends TestKitBase with AnyFunSuiteLike with AskPattern
private val dbActorRef: AskableActorRef = system.actorOf(Props(DbActor(pubSubMediatorRef, messageRegistry, inMemoryStorage)), "dbRumor")
private val securityModuleActorRef: AskableActorRef = system.actorOf(Props(SecurityModuleActor(RuntimeEnvironment.securityPath)), "securityRumor")
private val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef), "monitorRumor")
private val gossipRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
private val gossipRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
private var connectionMediatorRef: AskableActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipRef, messageRegistry), "connMediatorRumor")
private val rumorHandler: Flow[GraphMessage, GraphMessage, NotUsed] = ParamsHandler.rumorHandler(dbActorRef, messageRegistry)

Expand Down
Loading