-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSupervision.java
138 lines (114 loc) · 4.26 KB
/
Supervision.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import akka.actor.*;
import akka.japi.pf.ReceiveBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.duration.Duration;
import static akka.actor.SupervisorStrategy.escalate;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.stop;
import java.util.concurrent.TimeUnit;
public class Supervision {
private static final Logger LOGGER = LoggerFactory.getLogger(Supervision.class);
private static class Aphrodite extends AbstractActor {
static class ResumeException extends Exception {
}
static class StopException extends Exception {
}
static class RestartException extends Exception {
}
public Aphrodite() {
receive(ReceiveBuilder
.match(String.class, msg -> {
switch (msg) {
case "Resume":
throw new ResumeException();
case "Stop":
throw new StopException();
case "Restart":
throw new RestartException();
default:
throw new Exception();
}
})
.build());
}
@Override
public void preStart() throws Exception {
LOGGER.info("Aphrodite preStart hook....");
super.preStart();
}
@Override
public void preRestart(Throwable reason, Option<Object> message) throws Exception {
LOGGER.info("Aphrodite preRestart hook....");
super.preRestart(reason, message);
}
@Override
public void postRestart(Throwable reason) throws Exception {
LOGGER.info("Aphrodite postRestart hook....");
super.postRestart(reason);
}
@Override
public void postStop() throws Exception {
LOGGER.info("Aphrodite postStop hook....");
super.postStop();
}
}
private static class Hera extends AbstractActor {
private ActorRef childRef;
public Hera() {
receive(ReceiveBuilder
.match(
Object.class,
msg -> {
LOGGER.info(String.format("Hera received %s", msg));
childRef.tell(msg, ActorRef.noSender());
Thread.sleep(100);
})
.build());
}
private SupervisorStrategy.Directive matchExceptionWithDirective(Throwable t) {
if (t instanceof Aphrodite.ResumeException) {
return resume();
}
if (t instanceof Aphrodite.StopException) {
return stop();
}
if (t instanceof Aphrodite.RestartException) {
return restart();
}
return escalate();
}
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(
10,
Duration.create(1, TimeUnit.SECONDS),
this::matchExceptionWithDirective
);
}
@Override
public void preStart() throws Exception {
// Create Aphrodite Actor
childRef = context().actorOf(Props.create(Aphrodite.class), "Aphrodite");
Thread.sleep(100);
}
}
public static void main(String[] args) throws InterruptedException {
// Create the 'supervision' actor system
ActorSystem system = ActorSystem.create("supervision");
// Create Hera Actor
ActorRef hera = system.actorOf(Props.create(Hera.class), "hera");
// hera.tell("Resume", ActorRef.noSender());
// Thread.sleep(1000);
// LOGGER.info("");
// hera.tell("Restart", ActorRef.noSender());
// Thread.sleep(1000);
// LOGGER.info("");
hera.tell("Stop", ActorRef.noSender());
Thread.sleep(1000);
LOGGER.info("");
system.terminate();
}
}