-
-
Notifications
You must be signed in to change notification settings - Fork 58
/
Copy pathObservableSubscription.java
163 lines (136 loc) · 6.12 KB
/
ObservableSubscription.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package rx.observables.creating;
import org.junit.Test;
import rx.*;
import rx.internal.util.ActionSubscriber;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
/**
* @author Pablo Perez
* Normally when we create an observable this one is consume by constantClass observer(Subscription).
* which is created by subscribe method providing an Action function for onNext, onError and onComplete.
* Those functions are just like constantClass Java 8 consumer functions
* Once that we subscribe and we create an observer the main thread is block unitl the observer is unsubscribe from the observable.
* This happens once the last item of the observable has been process through the pipeline.
*/
public class ObservableSubscription {
private String foo = "empty";
int total = 0;
/**
* In this test we prove how when we subscribe constantClass observable, this one block the thread until emit all items
*/
@Test
public void testObservableSubscriptionBlockMainThread() {
Integer[] numbers = {0, 1, 2, 3, 4};
Observable.from(numbers)
.flatMap(Observable::just)
.doOnNext(number -> {
sleep();
})
.subscribe(number -> total += number);
System.out.println("I finish after all items are emitted:" + total);
}
/**
* Here since we use delay, that makes the pipeline asynchronous,
* we can check how only when the observable has emit all items the observer is unsubscribed
*/
@Test
public void testObservableWaitForUnsubscribed() {
Subscription subscription = Observable.just(1)
.delay(5, TimeUnit.MILLISECONDS)
.subscribe(number -> foo = "Subscription finish");
while (!subscription.isUnsubscribed()) {
System.out.println("wait for subscription to finish");
}
System.out.println(foo);
}
/**
* In this example we create another observable through the subscription, and we subscribe to be informed when the previous observer was unsubscribed
* Since we dont want to block our program, we will run in another thread,
* then and once the pipeline continue we return the result event to the main thread(immediate)
*/
@Test
public void testObservableWaitForUnsubscribedListener() {
Subscription subscription = Observable.just(1)
.delay(1, TimeUnit.SECONDS)
.subscribe(number -> foo = "Subscription finish");
Scheduler mainThread = Schedulers.immediate();
Observable.just(subscription)
.subscribeOn(Schedulers.newThread())
.doOnNext(s -> {
while (!s.isUnsubscribed()) {
sleep();
}
}).observeOn(mainThread)
.subscribe(u -> System.out.println("Observer unsubscribed:" + u.toString()));
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(2, TimeUnit.SECONDS);
}
/**
* The doOnUnsubscribe will be invoked just before the subscriber unsubscribe from the observable
*/
@Test
public void testDoOnUnsubscribe() {
Integer[] numbers = {0, 1, 2, 3, 4};
Observable.from(numbers)
.doOnUnsubscribe(() -> System.out.println("Last action must be done here"))
.subscribe(number -> System.out.println("number:" + number),
System.out::println,
() -> System.out.println("End of pipeline"));
}
/**
* In every moment we have the possibility to create our own subscriber, which you have to implement ActionSubscriber
* with onNext, onError and onComplete functions.
* Once that you do that you can attach that subscriber into constantClass subscription.
* <p>
* You can also can add the subscription into constantClass subscriptionList that constantClass subscriber has to know the state of the
* subscriptions where he is part of
*/
@Test
public void subscriberAndSubscription() {
Integer[] numbers = {0, 1, 2};
Subscriber subscriber = createSubscriber();
Subscription subscription = Observable.from(numbers).subscribeOn(Schedulers.newThread()).subscribe(subscriber);
Subscription subscription1 = Observable.from(numbers).subscribeOn(Schedulers.newThread()).subscribe(subscriber);
subscriber.add(subscription);
subscriber.add(subscription1);
System.out.println("Is Unsubscribed??:" + subscriber.isUnsubscribed());
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(5, TimeUnit.SECONDS);
System.out.println("Is Unsubscribed??:" + subscriber.isUnsubscribed());
}
/**
* You can in any moment unsubscribe all subscriber of constantClass subscription and create constantClass new one again.
* @throws InterruptedException
*/
@Test
public void subscribeAndUnsubscribe() throws InterruptedException {
Integer[] numbers = {0, 1, 2, 4, 5, 6};
Observable<Integer> observable = Observable.from(numbers);
Subscription subscription = observable.subscribeOn(Schedulers.newThread()).subscribe(createSubscriber());
Thread.sleep(2000);
subscription.unsubscribe();
subscription = observable.subscribeOn(Schedulers.newThread()).subscribe(createSubscriber());
Thread.sleep(10000);
System.out.println(subscription.isUnsubscribed());
}
private ActionSubscriber createSubscriber() {
return new ActionSubscriber(number -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println("preparing to unsubscribe");
}
System.out.println("Subscriber number:" + number);
},
System.out::println,
() -> System.out.println("Subscriber End of pipeline"));
}
private void sleep() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}