-
Notifications
You must be signed in to change notification settings - Fork 24
Writing a client
elandau edited this page Oct 31, 2014
·
1 revision
RxLoadBalancer expects the 'client' to drive all metrics gathering.
public class FooClientConnector implements HostClientConnector<FooAddress, FooClient> {
@Override
public Observable<FooClient> call(FooAddress host, final Action1<ClientEvent> actions, Observable<Void> signal) {
final Stopwatch sw = Stopwatch.createStarted();
actions.call(ClientEvent.connectStart());
return connect(host)
.doOnEach(new Observer<TestClient>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
actions.call(ClientEvent.connectFailure(sw.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS, e));
}
@Override
public void onNext(TestClient t) {
actions.call(ClientEvent.connectSuccess(sw.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS));
}
});
}
}
public class FooClient {
private final Action1<ClientEvent> actions;
public TestClient(Action1<ClientEvent> actions) {
this.actions = actions;
}
public Observable<Response> execute(Request request) {
final Stopwatch sw = Stopwatch.createStarted();
actions.call(ClientEvent.requestStart());
return
submit(request)
.doOnEach(new Observer<Response>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
actions.call(ClientEvent.requestFailure(sw.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS, e));
}
@Override
public void onNext(String t) {
actions.call(ClientEvent.requestSuccess(sw.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS));
}
});
}
}