-
-
Notifications
You must be signed in to change notification settings - Fork 58
/
Copy pathAkkaFeatures.java
201 lines (170 loc) · 8.46 KB
/
AkkaFeatures.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package akka;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.AskPattern;
import akka.actor.typed.javadsl.Behaviors;
import org.junit.Test;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
/**
* This whole file describe the three most commons patterns in Akka to communicate with or between Actors in Akka Actor system.
*
* Here we cover the fire and forget, request/response and finally ask pattern in each Test scenario.
* For the official documentation you can refer here [https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html]
*/
public class AkkaFeatures {
/**
* [FireAndForget] pattern: Using [ActorSystem.create] we're able to create the ActorRef/ActorSystem.
* Then using [tell] we inform the ActorOne and we forget about the response.
*/
@Test
public void fireAndForgetPattern() throws InterruptedException {
ActorSystem<ActorOneMessage> actorRef = ActorSystem.create(ActorOne.create(), "actorOne");
actorRef.tell(new FireAndForgetMessage("[Fire and Forget] Hello and don't expect to hear from you again...."));
Thread.sleep(2000);
}
/**
* [Request/Response] pattern: Using [ActorSystem.create] we're able to create the ActorRef/ActorSystem.
* Then using [tell] we inform the ActorOne which communicate to ActorTwo using this pattern.
*/
@Test
public void requestResponsePattern() throws InterruptedException {
ActorSystem<ActorOneMessage> actorRef = ActorSystem.create(ActorOne.create(), "actorOne");
actorRef.tell(new RequestResponseMessage("actorRef Akka world in Java"));
Thread.sleep(2000);
}
/**
* [Ask pattern]: Using [ActorSystem.create] we're able to create the ActorRef/ActorSystem.
* Then Using [AskPattern.ask] and passing the actorRef and the message we want to send, together
* with a timeout of the communication and an actor Scheduler, in order to schedule an action in the actor
* using a specific ExecutorContext, we obtain a future [CompletionStage] which we can implement callback or
* block(only for test purpose) until the response arrive
*/
@Test
public void askPattern() throws InterruptedException {
ActorSystem<ActorOneMessage> actorRef = ActorSystem.create(ActorOne.create(), "actorOne");
CompletionStage<String> future =
AskPattern.ask(
actorRef,
replyTo -> new AskPatternMessage("Ask Pattern", replyTo),
Duration.ofSeconds(3),
actorRef.scheduler());
future.whenComplete((response, error) -> {
if (error != null) {
System.out.println("Error in Ask pattern communication. Caused by " + error.getCause());
}
System.out.println("[Ask pattern] Message response:" + response);
});
Thread.sleep(2000);
}
/**
* ACTORS IMPLEMENTATION
* ----------------------
*/
public static class ActorOne {
/**
* Using [Behaviors.setup] is a factory that create a [AkkaContext] and pass to a function
* to be implemented by us. With that function expect as output the [Behavior[T]] so we
* just create one using again factory [Behaviors.receive(T)] where T is the interface/class that
* we accept as message type input in Actor.
* Then we use [onMessage] for each subtype message (in case specify interface) that we expect
* to receive, and then we implement the function where as input is the message that we receive,
* and we must return a [Behavior[T]]
*/
public static Behavior<ActorOneMessage> create() {
return Behaviors.setup(ctx -> Behaviors.receive(ActorOneMessage.class)
.onMessage(FireAndForgetMessage.class, ActorOne::processFireAndForgetMessage)
.onMessage(RequestResponseMessage.class, message -> processRequestResponseMessage(ctx, message))
.onMessage(ActorOneRequestResponseMessage.class, ActorOne::processRequestResponseFromAnotherActor)
.onMessage(AskPatternMessage.class, ActorOne::processAskPatternMessage)
.build());
}
/**
* Here we just receive the a message and we do not response back or send any message to nobody.
*/
private static Behavior<ActorOneMessage> processFireAndForgetMessage(FireAndForgetMessage message) {
System.out.println("[Fire and Forget] Message from client:" + message.value);
return Behaviors.same();
}
/**
* We create an ActorRef to which we want to send the message, and in the message itself,
* we send our ActorRef specifying the message we accept(Akka Typed ;) ) to expect a response
* once the ActorTwo process the message.
*/
private static Behavior<ActorOneMessage> processRequestResponseMessage(ActorContext<ActorOneMessage> ctx,
RequestResponseMessage command) {
System.out.println("Message receive outside Actor System world:" + command.value);
ActorRef<ActorTwoRequestResponseMessage> actorTwo = ctx.spawn(ActorTwo.create(), "ActorTwo");
System.out.println("Sending the message to ActorTwo in ActorSystem");
actorTwo.tell(new ActorTwoRequestResponseMessage("", ctx.getSelf()));
return Behaviors.same();
}
/**
* Here we just receive the response from the Actor we just invoke, previously in [processRequestResponseMessage]
*/
private static Behavior<ActorOneMessage> processRequestResponseFromAnotherActor(ActorOneRequestResponseMessage message) {
System.out.println("Response fom actorTwo received:" + message.value);
return Behaviors.same();
}
private static Behavior<ActorOneMessage> processAskPatternMessage(AskPatternMessage message) {
System.out.println("[Ask pattern] message from outside actors:" + message.value);
message.replyTo.tell("Copy that, ask pattern success!");
return Behaviors.same();
}
}
public static class ActorTwo {
public static Behavior<ActorTwoRequestResponseMessage> create() {
return Behaviors.setup(ctx -> Behaviors.receive(ActorTwoRequestResponseMessage.class)
.onMessage(ActorTwoRequestResponseMessage.class, ActorTwo::processMessage)
.build());
}
private static Behavior<ActorTwoRequestResponseMessage> processMessage(ActorTwoRequestResponseMessage command) {
System.out.printf("Hello %s!%n", command.replyTo.path().name());
System.out.println("Sending back message to ActorOne in ActorSystem");
command.replyTo.tell(new ActorOneRequestResponseMessage("Copy that buddy, hello!"));
return Behaviors.same();
}
}
/**
* ACTOR MESSAGES
* ---------------
*/
interface ActorOneMessage {
}
public static final class FireAndForgetMessage implements ActorOneMessage {
public final String value;
public FireAndForgetMessage(String value) {
this.value = value;
}
}
public static final class RequestResponseMessage implements ActorOneMessage {
public final String value;
public RequestResponseMessage(String value) {
this.value = value;
}
}
public static final class ActorOneRequestResponseMessage implements ActorOneMessage {
public final String value;
public ActorOneRequestResponseMessage(String value) {
this.value = value;
}
}
public static final class AskPatternMessage implements ActorOneMessage {
public final String value;
public final ActorRef<String> replyTo;
public AskPatternMessage(String whom, ActorRef<String> replyTo) {
this.value = whom;
this.replyTo = replyTo;
}
}
public static final class ActorTwoRequestResponseMessage {
public final String whom;
public final ActorRef<ActorOneMessage> replyTo;
public ActorTwoRequestResponseMessage(String whom, ActorRef<ActorOneMessage> replyTo) {
this.whom = whom;
this.replyTo = replyTo;
}
}
}