Skip to content

Commit

Permalink
cont migration
Browse files Browse the repository at this point in the history
  • Loading branch information
pron committed Jul 13, 2014
1 parent 673a7db commit 4eaa9b9
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 60 deletions.
4 changes: 2 additions & 2 deletions log4j.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
</root>
<!-- <logger name="co.paralleluniverse.actors" level="DEBUG"/> -->
<!-- <logger name="co.paralleluniverse.remote.galaxy" level="DEBUG"/> -->
<!-- <logger name="co.paralleluniverse.galaxy.core.StringRootManager" level="DEBUG"/> -->
<!-- <logger name="co.paralleluniverse.galaxy.core.AbstractCluster" level="DEBUG"/> -->
<!-- <logger name="co.paralleluniverse.galaxy.core.Cache" level="DEBUG"/> -->
<!-- <logger name="co.paralleluniverse.galaxy.core.MainMemory" level="DEBUG"/> -->
<!-- <logger name="co.paralleluniverse.galaxy.core.AbstractCluster" level="DEBUG"/> -->
<!-- <logger name="co.paralleluniverse.galaxy.core.StringRootManager" level="DEBUG"/> -->
<!-- <logger name="co.paralleluniverse.galaxy.netty.UDPComm" level="DEBUG"/> -->
<!-- <logger name="co.paralleluniverse.galaxy.core.BackupImpl" level="DEBUG"/> -->
</loggers>
Expand Down
40 changes: 21 additions & 19 deletions quasar-actors/src/main/java/co/paralleluniverse/actors/Actor.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ public static <T extends Actor<Message, V>, Message, V> T newActor(ActorSpec<T,
private volatile V result;
private volatile Throwable exception;
private volatile Throwable deathCause;
private volatile Object globalId;
private Object globalId;
private volatile Object registrationId;
private transient volatile ActorMonitor monitor;
private volatile boolean registered;
private boolean globalIdFixed;
private boolean hasMonitor;
private ActorSpec<?, Message, V> spec;
private Object aux;
Expand Down Expand Up @@ -628,13 +628,8 @@ final V run0() throws InterruptedException, SuspendExecution {
if (!(runner.getStrand() instanceof Fiber))
currentActor.set(this);
try {
if (this instanceof MigratingActor) {
Object gid = MigrationService.registerMigratingActor(globalId);
if (!globalIdFixed)
this.globalId = gid;
else if (!Objects.equal(this.globalId, gid))
throw new AssertionError();
}
if (this instanceof MigratingActor && globalId == null)
this.globalId = MigrationService.registerMigratingActor();

result = doRun();
die(null);
Expand Down Expand Up @@ -776,8 +771,7 @@ public final boolean isRegistered() {
}

Object getGlobalId() {
this.globalIdFixed = true;
return globalId;
return globalId != null ? globalId : registrationId;
}

@Override
Expand Down Expand Up @@ -889,6 +883,21 @@ else if (!getName().equals(name))
return register();
}

// called by ActorRegistry
void preRegister(String name) throws SuspendExecution {
if (getName() == null)
setName(name);
else if (!getName().equals(name))
throw new RegistrationException("Cannot register actor named " + getName() + " under a different name (" + name + ")");
assert !registered;
if (this instanceof MigratingActor && globalId == null)
this.globalId = MigrationService.registerMigratingActor();
}

void postRegister() {
this.registered = true;
}

/**
* Registers this actor in the actor registry under its name.
* This also creates a {@link #monitor() monitor} for this actor.
Expand All @@ -899,12 +908,7 @@ public final Actor register() throws SuspendExecution {
if (registered)
return this;
record(1, "Actor", "register", "Registering actor %s as %s", this, getName());
if (globalIdFixed)
throw new IllegalStateException("Actor has already been shared over the cluster and cannot be registered");
this.globalId = ActorRegistry.register(this, globalId);
if (this instanceof MigratingActor)
MigrationService.registerMigratingActor(globalId);
this.registered = true;
this.registrationId = ActorRegistry.register(this, null);
return this;
}

Expand Down Expand Up @@ -965,7 +969,6 @@ private void die(Throwable cause) {
public void migrateAndRestart() throws SuspendExecution {
record(1, "Actor", "migrateAndRestart", "Actor %s is migrating.", this);
verifyInActor();
this.globalIdFixed = true;
final RemoteActor<Message> remote = RemoteActorProxyFactoryService.create(ref(), getGlobalId());

final Mailbox mbox = mailbox();
Expand Down Expand Up @@ -1029,7 +1032,6 @@ public final Actor<Message, V> build() throws SuspendExecution {
//<editor-fold desc="Serialization">
/////////// Serialization ///////////////////////////////////
protected final Object writeReplace() throws java.io.ObjectStreamException {
globalIdFixed = true;
if (migrating.get() == Boolean.TRUE)
return this;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package co.paralleluniverse.actors;

import co.paralleluniverse.common.util.ServiceUtil;
import co.paralleluniverse.fibers.DefaultFiberScheduler;
import co.paralleluniverse.fibers.FiberScheduler;
import co.paralleluniverse.fibers.SuspendExecution;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,13 +41,15 @@ public class ActorRegistry {

}

static Object register(final Actor<?, ?> actor, final Object globalId) throws SuspendExecution {
static Object register(Actor<?, ?> actor, Object globalId) throws SuspendExecution {
final String name = actor.getName();
if (name == null)
throw new IllegalArgumentException("name is null");
LOG.info("Registering {}: {}", name, actor);

actor.preRegister(name);
final Object res = registry.register(actor.ref(), globalId);

actor.monitor();
return res;
}
Expand All @@ -62,7 +66,7 @@ static void unregister(final ActorRef<?> actor) {
* @param name the actor's name.
* @return the actor, or {@code null} if no actor by that name is currently registered.
*/
public static <Message> ActorRef<Message> tryGetActor(final String name) throws SuspendExecution {
public static <Message> ActorRef<Message> tryGetActor(String name) throws SuspendExecution {
return registry.tryGetActor(name);
}

Expand All @@ -74,7 +78,7 @@ public static <Message> ActorRef<Message> tryGetActor(final String name) throws
* @param unit the timeout's unit
* @return the actor, or {@code null} if the timeout expires before one is registered.
*/
public static <Message> ActorRef<Message> getActor(final String name, final long timeout, final TimeUnit unit) throws InterruptedException, SuspendExecution {
public static <Message> ActorRef<Message> getActor(String name, long timeout, TimeUnit unit) throws InterruptedException, SuspendExecution {
return registry.getActor(name, timeout, unit);
}

Expand All @@ -84,21 +88,46 @@ public static <Message> ActorRef<Message> getActor(final String name, final long
* @param name the actor's name.
* @return the actor.
*/
public static <Message> ActorRef<Message> getActor(final String name) throws InterruptedException, SuspendExecution {
public static <Message> ActorRef<Message> getActor(String name) throws InterruptedException, SuspendExecution {
return getActor(name, 0, null);
}

/**
* Locates a registered actor by name, or, if not actor by that name is currently registered, creates and registers it.
* This method atomically checks if an actor by the given name is registers, and if so, returns it; otherwise it registers the
* Locates a registered actor by name, or, if not actor by that name is currently registered, spawns and registers it.
* This method atomically checks if an actor by the given name is registers, and if so, returns it; otherwise it spawns and registers the
* actor returned by the given factory.
*
* @param name the actor's name.
* @param actorFactory returns an actor that will be registered if one isn't currently registered.
* @param scheduler the {@link FiberScheduler} to use when spawning the actor, or {@code null} to spawn the fiber using the default scheduler.
* @return the actor.
*/
public static <Message> ActorRef<Message> getOrRegisterActor(final String name, final Callable<ActorRef<Message>> actorFactory) throws SuspendExecution {
return registry.getOrRegisterActor(name, actorFactory);
public static <Message> ActorRef<Message> getOrRegisterActor(final String name, final Callable<Actor<Message, ?>> actorFactory, final FiberScheduler scheduler) throws SuspendExecution {
Callable<ActorRef<Message>> factory = new Callable<ActorRef<Message>>() {

@Override
public ActorRef<Message> call() throws Exception {
Actor actor = actorFactory.call();
actor.preRegister(name);
return scheduler != null ? actor.spawn(scheduler) : actor.spawnThread();
}
};
ActorRef<Message> actor = registry.getOrRegisterActor(name, factory);
LocalActor.postRegister(actor);
return actor;
}

/**
* Locates a registered actor by name, or, if not actor by that name is currently registered, spawns and registers it.
* This method atomically checks if an actor by the given name is registers, and if so, returns it; otherwise it spawns the actor
* returned by the given factory using the default fiber scheduler, and registers it.
*
* @param name the actor's name.
* @param actorFactory returns an actor that will be registered if one isn't currently registered.
* @return the actor.
*/
public static <Message> ActorRef<Message> getOrRegisterActor(String name, Callable<Actor<Message, ?>> actorFactory) throws SuspendExecution {
return getOrRegisterActor(name, actorFactory, DefaultFiberScheduler.getInstance());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ static StackTraceElement[] getStackTrace(ActorRef<?> actor) {
return actorOf(actor).getStackTrace();
}

static void postRegister(ActorRef<?> ar) {
ActorImpl impl = ar.getImpl();
if (!(impl instanceof Actor))
return;
((Actor)impl).postRegister();
}

private static Actor actorOf(ActorRef<?> ar) {
ActorImpl impl = ar.getImpl();
if (!(impl instanceof Actor))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
package co.paralleluniverse.actors;

import co.paralleluniverse.concurrent.util.MapUtil;
import co.paralleluniverse.fibers.FiberScheduler;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.concurrent.ReentrantLock;
import com.google.common.base.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class MigrationService {
private MigrationService() {
}

public static Object registerMigratingActor(Object id) throws SuspendExecution {
return migrator.registerMigratingActor(id);
public static Object registerMigratingActor() throws SuspendExecution {
Object res = migrator.registerMigratingActor();
return res;
}

public static void migrate(Object id, Actor<?, ?> actor) throws SuspendExecution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @author pron
*/
public interface Migrator {
Object registerMigratingActor(Object id) throws SuspendExecution;
Object registerMigratingActor() throws SuspendExecution;
void migrate(Object id, Actor<?, ?> actor) throws SuspendExecution;
<M> Actor<M, ?> hire(ActorRef<M> actorRef, ActorImpl<M> actorImpl) throws SuspendExecution;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import co.paralleluniverse.actors.ActorRegistry;
import co.paralleluniverse.actors.BasicActor;
import co.paralleluniverse.actors.MigratingActor;
import co.paralleluniverse.actors.behaviors.Server;
import co.paralleluniverse.actors.behaviors.ServerActor;
import co.paralleluniverse.fibers.SuspendExecution;
import static co.paralleluniverse.galaxy.example.migration.Message.Type.*;
import java.util.concurrent.Callable;
Expand All @@ -32,39 +34,46 @@

public class Main {
public static void main(String[] args) throws Exception {
final int nodeId = 1; // Integer.parseInt(args[0]);
final int nodeId = Integer.parseInt(args[0]);
System.setProperty("galaxy.nodeId", Integer.toString(nodeId));
System.setProperty("galaxy.port", Integer.toString(7050 + nodeId));
System.setProperty("galaxy.slave_port", Integer.toString(8050 + nodeId));

// com.esotericsoftware.minlog.Log.set(1);
ActorRegistry.hasGlobalRegistry();
ActorRef<Message> actor = ActorRegistry.getOrRegisterActor("migrant", new Callable<ActorRef<Message>>() {
ActorRef<Message> actor = ActorRegistry.getOrRegisterActor("migrant", new Callable<Actor<Message, ?>>() {

@Override
public ActorRef<Message> call() throws Exception {
return new Migrant().spawn();
public Actor<Message, Void> call() throws Exception {
return new Migrant();
}
});
if (actor == null) {
System.out.println("Creating actor");
actor = new Migrant("migrant").spawn();
} else
System.out.println("Found registered actor");
// Server<Message, Integer, Message> actor = (Server<Message, Integer, Message>)ActorRegistry.getOrRegisterActor("migrant", new Callable() {
//
// @Override
// public ServerActor call() throws Exception {
// return new Migrant();
// }
// });

for (int i = 0; i < 100; i++) {
int i;
for (i = 0; i < 500; i++) {
final double r = ThreadLocalRandom.current().nextDouble();
if (r < 0.2) {
actor.send(new Message(null, MIGRATE));
if (r < 0.1) {
System.out.println("Hiring actor...");
Thread.sleep(500);
// actor.call(new Message(nodeId, i, MIGRATE));
actor.send(new Message(nodeId, i, MIGRATE));
Thread.sleep(100);
Actor.hire(actor).spawn();
System.out.println("Hired!");
} else
actor.send(new Message(null, PRINT));
Thread.sleep(1000);
} else {
actor.send(new Message(nodeId, i, PRINT));
// actor.cast(new Message(nodeId, i, PRINT));
}
Thread.sleep(500);
}
actor.send(new Message(null, FINISHED));
actor.send(new Message(nodeId, i, FINISHED));
// actor.cast(new Message(nodeId, i, FINISHED));

System.out.println("Done");
ActorRegistry.shutdown();
Expand All @@ -88,6 +97,7 @@ protected Void doRun() throws InterruptedException, SuspendExecution {
for (;;) {
Message m = receive(2, TimeUnit.SECONDS);
if (m != null) {
System.out.println("received: " + m);
messageCount++;
switch (m.type) {
case PRINT:
Expand All @@ -106,4 +116,52 @@ protected Void doRun() throws InterruptedException, SuspendExecution {
return null;
}
}
// static class Migrant extends ServerActor<Message, Integer, Message> implements MigratingActor {
// private int loopCount;
// private int messageCount;
//
// public Migrant() {
// super();
// setTimeout(2, TimeUnit.SECONDS);
// }
//
// public Migrant(String name) {
// super(name);
// setTimeout(2, TimeUnit.SECONDS);
// }
//
// @Override
// protected void handleMessage(Object m1) throws InterruptedException, SuspendExecution {
// messageCount++;
// loopCount++;
// super.handleMessage(m1);
// }
//
// @Override
// protected void handleCast(ActorRef<?> from, Object id, Message m) throws SuspendExecution {
// switch (m.type) {
// case PRINT:
// System.out.println("iter: " + loopCount + " messages: " + messageCount);
// break;
// case FINISHED:
// shutdown();
// }
// }
//
// @Override
// protected Integer handleCall(ActorRef<?> from, Object id, Message m) throws Exception, SuspendExecution {
// switch (m.type) {
// case MIGRATE:
// migrateAndRestart();
// return messageCount;
// default:
// throw new UnsupportedOperationException(m.toString());
// }
// }
//
// @Override
// protected void handleTimeout() throws SuspendExecution {
// loopCount++;
// }
// }
}
Loading

0 comments on commit 4eaa9b9

Please sign in to comment.