Skip to content

Commit

Permalink
akka version updates
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonnorris committed Jun 28, 2019
1 parent 6d17424 commit 91c3eb8
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,13 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
implicit val ec = context.dispatcher
val mediator = DistributedPubSub(system).mediator
val replicator = DistributedData(system).replicator
implicit val myAddress = DistributedData(system).selfUniqueAddress

mediator ! Put(self) //allow point to point messaging based on the actor name: use Send(/user/<myname>) to send messages to me in the cluster
//subscribe to invoker ids changes (need to setup additional keys based on each invoker arriving)
replicator ! Subscribe(InvokerIdsKey, self)
//add this invoker to ids list
replicator ! Update(InvokerIdsKey, ORSet.empty[Int], WriteLocal)(_ + (myId))
replicator ! Update(InvokerIdsKey, ORSet.empty[Int], WriteLocal)(_ :+ (myId))

logging.info(this, "subscribing to NodeStats updates")
system.eventStream.subscribe(self, classOf[NodeStatsUpdate])
Expand All @@ -212,7 +213,7 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
private def cleanup() = {
//remove this invoker from ids list
logging.info(this, s"stopping invoker ${myId}")
replicator ! Update(InvokerIdsKey, ORSet.empty[Int], WriteLocal)(_ - myId)
replicator ! Update(InvokerIdsKey, ORSet.empty[Int], WriteLocal)(_.remove(myId))

}
override def receive: Receive = {
Expand All @@ -224,8 +225,8 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
logging.info(
this,
s"invoker ${myId} (self) has ${reservations.size} reservations (${reservations.map(_.size.toMB).sum}MB)")
replicator ! Update(myReservationsKey, LWWRegister[List[Reservation]](List.empty), WriteLocal)(reg =>
reg.withValue(reservations))
replicator ! Update(myReservationsKey, LWWRegister[List[Reservation]](myAddress, List.empty), WriteLocal)(
reg => reg.withValueOf(reservations))
}
//update this invokers idles seen by other invokers; including only idles past the idleGrace period
val idleGraceInstant = Instant.now().minusSeconds(poolConfig.clusterManagedIdleGrace.toSeconds)
Expand All @@ -238,8 +239,8 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
logging.info(
this,
s"invoker ${myId} (self) has ${lastUnused.size} unused (${lastUnused.map(_.size.toMB).sum}MB)")
replicator ! Update(myUnusedKey, LWWRegister[List[RemoteContainerRef]](List.empty), WriteLocal)(reg =>
reg.withValue(idles))
replicator ! Update(myUnusedKey, LWWRegister[List[RemoteContainerRef]](myAddress, List.empty), WriteLocal)(
reg => reg.withValueOf(idles))
}
case UpdateSuccess => //nothing (normal behavior)
case f: UpdateFailure[_] => //log the failure
Expand Down

0 comments on commit 91c3eb8

Please sign in to comment.