From 4eaa9b992473f75d3f860ec2aa1c910dc9a040c7 Mon Sep 17 00:00:00 2001 From: pron Date: Sun, 13 Jul 2014 13:51:28 +0300 Subject: [PATCH] cont migration --- log4j.xml | 4 +- .../co/paralleluniverse/actors/Actor.java | 40 ++++---- .../actors/ActorRegistry.java | 45 +++++++-- .../paralleluniverse/actors/LocalActor.java | 7 ++ .../actors/LocalActorRegistry.java | 2 +- .../actors/MigrationService.java | 5 +- .../paralleluniverse/actors/spi/Migrator.java | 2 +- .../galaxy/example/migration/Main.java | 92 +++++++++++++++---- .../galaxy/example/migration/Message.java | 12 ++- .../remote/galaxy/GlxGlobalRegistry.java | 4 + .../remote/galaxy/GlxMigrator.java | 13 ++- 11 files changed, 166 insertions(+), 60 deletions(-) diff --git a/log4j.xml b/log4j.xml index b325c71b43..4559200f43 100644 --- a/log4j.xml +++ b/log4j.xml @@ -12,10 +12,10 @@ - + - + diff --git a/quasar-actors/src/main/java/co/paralleluniverse/actors/Actor.java b/quasar-actors/src/main/java/co/paralleluniverse/actors/Actor.java index 3639a03164..019ab895ce 100644 --- a/quasar-actors/src/main/java/co/paralleluniverse/actors/Actor.java +++ b/quasar-actors/src/main/java/co/paralleluniverse/actors/Actor.java @@ -84,10 +84,10 @@ public static , Message, V> T newActor(ActorSpec spec; private Object aux; @@ -628,13 +628,8 @@ final V run0() throws InterruptedException, SuspendExecution { if (!(runner.getStrand() instanceof Fiber)) currentActor.set(this); try { - if (this instanceof MigratingActor) { - Object gid = MigrationService.registerMigratingActor(globalId); - if (!globalIdFixed) - this.globalId = gid; - else if (!Objects.equal(this.globalId, gid)) - throw new AssertionError(); - } + if (this instanceof MigratingActor && globalId == null) + this.globalId = MigrationService.registerMigratingActor(); result = doRun(); die(null); @@ -776,8 +771,7 @@ public final boolean isRegistered() { } Object getGlobalId() { - this.globalIdFixed = true; - return globalId; + return globalId != null ? globalId : registrationId; } @Override @@ -889,6 +883,21 @@ else if (!getName().equals(name)) return register(); } + // called by ActorRegistry + void preRegister(String name) throws SuspendExecution { + if (getName() == null) + setName(name); + else if (!getName().equals(name)) + throw new RegistrationException("Cannot register actor named " + getName() + " under a different name (" + name + ")"); + assert !registered; + if (this instanceof MigratingActor && globalId == null) + this.globalId = MigrationService.registerMigratingActor(); + } + + void postRegister() { + this.registered = true; + } + /** * Registers this actor in the actor registry under its name. * This also creates a {@link #monitor() monitor} for this actor. @@ -899,12 +908,7 @@ public final Actor register() throws SuspendExecution { if (registered) return this; record(1, "Actor", "register", "Registering actor %s as %s", this, getName()); - if (globalIdFixed) - throw new IllegalStateException("Actor has already been shared over the cluster and cannot be registered"); - this.globalId = ActorRegistry.register(this, globalId); - if (this instanceof MigratingActor) - MigrationService.registerMigratingActor(globalId); - this.registered = true; + this.registrationId = ActorRegistry.register(this, null); return this; } @@ -965,7 +969,6 @@ private void die(Throwable cause) { public void migrateAndRestart() throws SuspendExecution { record(1, "Actor", "migrateAndRestart", "Actor %s is migrating.", this); verifyInActor(); - this.globalIdFixed = true; final RemoteActor remote = RemoteActorProxyFactoryService.create(ref(), getGlobalId()); final Mailbox mbox = mailbox(); @@ -1029,7 +1032,6 @@ public final Actor build() throws SuspendExecution { // /////////// Serialization /////////////////////////////////// protected final Object writeReplace() throws java.io.ObjectStreamException { - globalIdFixed = true; if (migrating.get() == Boolean.TRUE) return this; diff --git a/quasar-actors/src/main/java/co/paralleluniverse/actors/ActorRegistry.java b/quasar-actors/src/main/java/co/paralleluniverse/actors/ActorRegistry.java index 41f2d6e7cf..711de0851e 100644 --- a/quasar-actors/src/main/java/co/paralleluniverse/actors/ActorRegistry.java +++ b/quasar-actors/src/main/java/co/paralleluniverse/actors/ActorRegistry.java @@ -14,6 +14,8 @@ package co.paralleluniverse.actors; import co.paralleluniverse.common.util.ServiceUtil; +import co.paralleluniverse.fibers.DefaultFiberScheduler; +import co.paralleluniverse.fibers.FiberScheduler; import co.paralleluniverse.fibers.SuspendExecution; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -39,13 +41,15 @@ public class ActorRegistry { } - static Object register(final Actor actor, final Object globalId) throws SuspendExecution { + static Object register(Actor actor, Object globalId) throws SuspendExecution { final String name = actor.getName(); if (name == null) throw new IllegalArgumentException("name is null"); LOG.info("Registering {}: {}", name, actor); + actor.preRegister(name); final Object res = registry.register(actor.ref(), globalId); + actor.monitor(); return res; } @@ -62,7 +66,7 @@ static void unregister(final ActorRef actor) { * @param name the actor's name. * @return the actor, or {@code null} if no actor by that name is currently registered. */ - public static ActorRef tryGetActor(final String name) throws SuspendExecution { + public static ActorRef tryGetActor(String name) throws SuspendExecution { return registry.tryGetActor(name); } @@ -74,7 +78,7 @@ public static ActorRef tryGetActor(final String name) throws * @param unit the timeout's unit * @return the actor, or {@code null} if the timeout expires before one is registered. */ - public static ActorRef getActor(final String name, final long timeout, final TimeUnit unit) throws InterruptedException, SuspendExecution { + public static ActorRef getActor(String name, long timeout, TimeUnit unit) throws InterruptedException, SuspendExecution { return registry.getActor(name, timeout, unit); } @@ -84,21 +88,46 @@ public static ActorRef getActor(final String name, final long * @param name the actor's name. * @return the actor. */ - public static ActorRef getActor(final String name) throws InterruptedException, SuspendExecution { + public static ActorRef getActor(String name) throws InterruptedException, SuspendExecution { return getActor(name, 0, null); } /** - * Locates a registered actor by name, or, if not actor by that name is currently registered, creates and registers it. - * This method atomically checks if an actor by the given name is registers, and if so, returns it; otherwise it registers the + * Locates a registered actor by name, or, if not actor by that name is currently registered, spawns and registers it. + * This method atomically checks if an actor by the given name is registers, and if so, returns it; otherwise it spawns and registers the * actor returned by the given factory. * * @param name the actor's name. * @param actorFactory returns an actor that will be registered if one isn't currently registered. + * @param scheduler the {@link FiberScheduler} to use when spawning the actor, or {@code null} to spawn the fiber using the default scheduler. * @return the actor. */ - public static ActorRef getOrRegisterActor(final String name, final Callable> actorFactory) throws SuspendExecution { - return registry.getOrRegisterActor(name, actorFactory); + public static ActorRef getOrRegisterActor(final String name, final Callable> actorFactory, final FiberScheduler scheduler) throws SuspendExecution { + Callable> factory = new Callable>() { + + @Override + public ActorRef call() throws Exception { + Actor actor = actorFactory.call(); + actor.preRegister(name); + return scheduler != null ? actor.spawn(scheduler) : actor.spawnThread(); + } + }; + ActorRef actor = registry.getOrRegisterActor(name, factory); + LocalActor.postRegister(actor); + return actor; + } + + /** + * Locates a registered actor by name, or, if not actor by that name is currently registered, spawns and registers it. + * This method atomically checks if an actor by the given name is registers, and if so, returns it; otherwise it spawns the actor + * returned by the given factory using the default fiber scheduler, and registers it. + * + * @param name the actor's name. + * @param actorFactory returns an actor that will be registered if one isn't currently registered. + * @return the actor. + */ + public static ActorRef getOrRegisterActor(String name, Callable> actorFactory) throws SuspendExecution { + return getOrRegisterActor(name, actorFactory, DefaultFiberScheduler.getInstance()); } /** diff --git a/quasar-actors/src/main/java/co/paralleluniverse/actors/LocalActor.java b/quasar-actors/src/main/java/co/paralleluniverse/actors/LocalActor.java index 45842714ed..d0c0ded3db 100644 --- a/quasar-actors/src/main/java/co/paralleluniverse/actors/LocalActor.java +++ b/quasar-actors/src/main/java/co/paralleluniverse/actors/LocalActor.java @@ -143,6 +143,13 @@ static StackTraceElement[] getStackTrace(ActorRef actor) { return actorOf(actor).getStackTrace(); } + static void postRegister(ActorRef ar) { + ActorImpl impl = ar.getImpl(); + if (!(impl instanceof Actor)) + return; + ((Actor)impl).postRegister(); + } + private static Actor actorOf(ActorRef ar) { ActorImpl impl = ar.getImpl(); if (!(impl instanceof Actor)) diff --git a/quasar-actors/src/main/java/co/paralleluniverse/actors/LocalActorRegistry.java b/quasar-actors/src/main/java/co/paralleluniverse/actors/LocalActorRegistry.java index 3ba4eba623..53029d16b4 100644 --- a/quasar-actors/src/main/java/co/paralleluniverse/actors/LocalActorRegistry.java +++ b/quasar-actors/src/main/java/co/paralleluniverse/actors/LocalActorRegistry.java @@ -14,13 +14,13 @@ package co.paralleluniverse.actors; import co.paralleluniverse.concurrent.util.MapUtil; +import co.paralleluniverse.fibers.FiberScheduler; import co.paralleluniverse.fibers.SuspendExecution; import co.paralleluniverse.strands.concurrent.ReentrantLock; import com.google.common.base.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/quasar-actors/src/main/java/co/paralleluniverse/actors/MigrationService.java b/quasar-actors/src/main/java/co/paralleluniverse/actors/MigrationService.java index 7c4d3c1d49..23e4e20742 100644 --- a/quasar-actors/src/main/java/co/paralleluniverse/actors/MigrationService.java +++ b/quasar-actors/src/main/java/co/paralleluniverse/actors/MigrationService.java @@ -34,8 +34,9 @@ class MigrationService { private MigrationService() { } - public static Object registerMigratingActor(Object id) throws SuspendExecution { - return migrator.registerMigratingActor(id); + public static Object registerMigratingActor() throws SuspendExecution { + Object res = migrator.registerMigratingActor(); + return res; } public static void migrate(Object id, Actor actor) throws SuspendExecution { diff --git a/quasar-actors/src/main/java/co/paralleluniverse/actors/spi/Migrator.java b/quasar-actors/src/main/java/co/paralleluniverse/actors/spi/Migrator.java index fb4f986be2..6f67a5eb8c 100644 --- a/quasar-actors/src/main/java/co/paralleluniverse/actors/spi/Migrator.java +++ b/quasar-actors/src/main/java/co/paralleluniverse/actors/spi/Migrator.java @@ -23,7 +23,7 @@ * @author pron */ public interface Migrator { - Object registerMigratingActor(Object id) throws SuspendExecution; + Object registerMigratingActor() throws SuspendExecution; void migrate(Object id, Actor actor) throws SuspendExecution; Actor hire(ActorRef actorRef, ActorImpl actorImpl) throws SuspendExecution; } diff --git a/quasar-galaxy/src/main/java/co/paralleluniverse/galaxy/example/migration/Main.java b/quasar-galaxy/src/main/java/co/paralleluniverse/galaxy/example/migration/Main.java index 08e77980f8..ff86bfed05 100644 --- a/quasar-galaxy/src/main/java/co/paralleluniverse/galaxy/example/migration/Main.java +++ b/quasar-galaxy/src/main/java/co/paralleluniverse/galaxy/example/migration/Main.java @@ -24,6 +24,8 @@ import co.paralleluniverse.actors.ActorRegistry; import co.paralleluniverse.actors.BasicActor; import co.paralleluniverse.actors.MigratingActor; +import co.paralleluniverse.actors.behaviors.Server; +import co.paralleluniverse.actors.behaviors.ServerActor; import co.paralleluniverse.fibers.SuspendExecution; import static co.paralleluniverse.galaxy.example.migration.Message.Type.*; import java.util.concurrent.Callable; @@ -32,39 +34,46 @@ public class Main { public static void main(String[] args) throws Exception { - final int nodeId = 1; // Integer.parseInt(args[0]); + final int nodeId = Integer.parseInt(args[0]); System.setProperty("galaxy.nodeId", Integer.toString(nodeId)); System.setProperty("galaxy.port", Integer.toString(7050 + nodeId)); System.setProperty("galaxy.slave_port", Integer.toString(8050 + nodeId)); // com.esotericsoftware.minlog.Log.set(1); ActorRegistry.hasGlobalRegistry(); - ActorRef actor = ActorRegistry.getOrRegisterActor("migrant", new Callable>() { + ActorRef actor = ActorRegistry.getOrRegisterActor("migrant", new Callable>() { @Override - public ActorRef call() throws Exception { - return new Migrant().spawn(); + public Actor call() throws Exception { + return new Migrant(); } }); - if (actor == null) { - System.out.println("Creating actor"); - actor = new Migrant("migrant").spawn(); - } else - System.out.println("Found registered actor"); +// Server actor = (Server)ActorRegistry.getOrRegisterActor("migrant", new Callable() { +// +// @Override +// public ServerActor call() throws Exception { +// return new Migrant(); +// } +// }); - for (int i = 0; i < 100; i++) { + int i; + for (i = 0; i < 500; i++) { final double r = ThreadLocalRandom.current().nextDouble(); - if (r < 0.2) { - actor.send(new Message(null, MIGRATE)); + if (r < 0.1) { System.out.println("Hiring actor..."); - Thread.sleep(500); + // actor.call(new Message(nodeId, i, MIGRATE)); + actor.send(new Message(nodeId, i, MIGRATE)); + Thread.sleep(100); Actor.hire(actor).spawn(); System.out.println("Hired!"); - } else - actor.send(new Message(null, PRINT)); - Thread.sleep(1000); + } else { + actor.send(new Message(nodeId, i, PRINT)); + // actor.cast(new Message(nodeId, i, PRINT)); + } + Thread.sleep(500); } - actor.send(new Message(null, FINISHED)); + actor.send(new Message(nodeId, i, FINISHED)); +// actor.cast(new Message(nodeId, i, FINISHED)); System.out.println("Done"); ActorRegistry.shutdown(); @@ -88,6 +97,7 @@ protected Void doRun() throws InterruptedException, SuspendExecution { for (;;) { Message m = receive(2, TimeUnit.SECONDS); if (m != null) { + System.out.println("received: " + m); messageCount++; switch (m.type) { case PRINT: @@ -106,4 +116,52 @@ protected Void doRun() throws InterruptedException, SuspendExecution { return null; } } +// static class Migrant extends ServerActor implements MigratingActor { +// private int loopCount; +// private int messageCount; +// +// public Migrant() { +// super(); +// setTimeout(2, TimeUnit.SECONDS); +// } +// +// public Migrant(String name) { +// super(name); +// setTimeout(2, TimeUnit.SECONDS); +// } +// +// @Override +// protected void handleMessage(Object m1) throws InterruptedException, SuspendExecution { +// messageCount++; +// loopCount++; +// super.handleMessage(m1); +// } +// +// @Override +// protected void handleCast(ActorRef from, Object id, Message m) throws SuspendExecution { +// switch (m.type) { +// case PRINT: +// System.out.println("iter: " + loopCount + " messages: " + messageCount); +// break; +// case FINISHED: +// shutdown(); +// } +// } +// +// @Override +// protected Integer handleCall(ActorRef from, Object id, Message m) throws Exception, SuspendExecution { +// switch (m.type) { +// case MIGRATE: +// migrateAndRestart(); +// return messageCount; +// default: +// throw new UnsupportedOperationException(m.toString()); +// } +// } +// +// @Override +// protected void handleTimeout() throws SuspendExecution { +// loopCount++; +// } +// } } diff --git a/quasar-galaxy/src/main/java/co/paralleluniverse/galaxy/example/migration/Message.java b/quasar-galaxy/src/main/java/co/paralleluniverse/galaxy/example/migration/Message.java index 15dc65001c..ea869cbeaa 100644 --- a/quasar-galaxy/src/main/java/co/paralleluniverse/galaxy/example/migration/Message.java +++ b/quasar-galaxy/src/main/java/co/paralleluniverse/galaxy/example/migration/Message.java @@ -19,7 +19,6 @@ */ package co.paralleluniverse.galaxy.example.migration; -import co.paralleluniverse.actors.ActorRef; import java.io.Serializable; /** @@ -29,11 +28,18 @@ class Message implements Serializable { enum Type { MIGRATE, PRINT, FINISHED } - final ActorRef from; + final int from; + final int index; final Type type; - public Message(ActorRef from, Type type) { + public Message(int from, int index, Type type) { this.from = from; + this.index = index; this.type = type; } + + @Override + public String toString() { + return "Message{" + "from: " + from + " " + index + " " + type + '}'; + } } diff --git a/quasar-galaxy/src/main/java/co/paralleluniverse/remote/galaxy/GlxGlobalRegistry.java b/quasar-galaxy/src/main/java/co/paralleluniverse/remote/galaxy/GlxGlobalRegistry.java index 74043372e2..c131e86ce1 100644 --- a/quasar-galaxy/src/main/java/co/paralleluniverse/remote/galaxy/GlxGlobalRegistry.java +++ b/quasar-galaxy/src/main/java/co/paralleluniverse/remote/galaxy/GlxGlobalRegistry.java @@ -13,9 +13,12 @@ */ package co.paralleluniverse.remote.galaxy; +import co.paralleluniverse.actors.Actor; +import co.paralleluniverse.actors.ActorImpl; import co.paralleluniverse.actors.ActorRef; import co.paralleluniverse.actors.spi.ActorRegistry; import co.paralleluniverse.fibers.Fiber; +import co.paralleluniverse.fibers.FiberScheduler; import co.paralleluniverse.fibers.SuspendExecution; import co.paralleluniverse.galaxy.AbstractCacheListener; import co.paralleluniverse.galaxy.Cache; @@ -278,6 +281,7 @@ private ActorRef getOrRegisterActor0(final String rootName, C try { try { final long root = store.getRoot(rootName, txn); + final ActorRef actor; byte[] buf = store.getx(root, txn); if (buf == null || buf.length == 0) { diff --git a/quasar-galaxy/src/main/java/co/paralleluniverse/remote/galaxy/GlxMigrator.java b/quasar-galaxy/src/main/java/co/paralleluniverse/remote/galaxy/GlxMigrator.java index e72aa0c8e1..34a7ee2f11 100644 --- a/quasar-galaxy/src/main/java/co/paralleluniverse/remote/galaxy/GlxMigrator.java +++ b/quasar-galaxy/src/main/java/co/paralleluniverse/remote/galaxy/GlxMigrator.java @@ -21,7 +21,6 @@ import co.paralleluniverse.galaxy.quasar.Grid; import co.paralleluniverse.galaxy.quasar.Store; import co.paralleluniverse.io.serialization.Serialization; -import co.paralleluniverse.io.serialization.kryo.KryoSerializer; import org.kohsuke.MetaInfServices; /** @@ -43,14 +42,14 @@ public GlxMigrator() { } @Override - public Object registerMigratingActor(Object id) throws SuspendExecution { + public Object registerMigratingActor() throws SuspendExecution { try { - if (id == null) +// if (id == null) return store.put(new byte[0], null); - else { - store.getx((Long) id, null); - return id; - } +// else { +// store.getx((Long) id, null); +// return id; +// } } catch (co.paralleluniverse.galaxy.TimeoutException e) { throw new RuntimeException(e); }