-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclientbootstrap.patch
79 lines (75 loc) · 2.79 KB
/
clientbootstrap.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
diff --git a/wangle/bootstrap/ClientBootstrap.h b/wangle/bootstrap/ClientBootstrap.h
index b9a4153..6787288 100644
--- a/wangle/bootstrap/ClientBootstrap.h
+++ b/wangle/bootstrap/ClientBootstrap.h
@@ -82,6 +82,7 @@ class ClientBootstrap {
if (group_) {
base = group_->getEventBase();
}
+ base_ = base;
folly::Future<Pipeline*> retval((Pipeline*)nullptr);
base->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
std::shared_ptr<folly::AsyncSocket> socket;
@@ -105,6 +106,35 @@ class ClientBootstrap {
return retval;
}
+ folly::Future<Pipeline*> reconnect(
+ const folly::SocketAddress& address,
+ std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
+ DCHECK(pipelineFactory_);
+ DCHECK(base_);
+ folly::Future<Pipeline*> retval((Pipeline*)nullptr);
+ base_->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
+ std::shared_ptr<folly::AsyncSocket> socket;
+ if (sslContext_) {
+ auto sslSocket = folly::AsyncSSLSocket::newSocket(sslContext_, base_);
+ if (sslSession_) {
+ sslSocket->setSSLSession(sslSession_, true);
+ }
+ socket = sslSocket;
+ } else {
+ socket = folly::AsyncSocket::newSocket(base_);
+ }
+ folly::Promise<Pipeline*> promise;
+ retval = promise.getFuture();
+ socket->connect(
+ new ConnectCallback(std::move(promise), this),
+ address,
+ timeout.count());
+ pipeline_ = pipelineFactory_->newPipeline(socket);
+ });
+ return retval;
+ }
+
+
ClientBootstrap* pipelineFactory(
std::shared_ptr<PipelineFactory<Pipeline>> factory) {
pipelineFactory_ = factory;
@@ -115,7 +145,17 @@ class ClientBootstrap {
return pipeline_.get();
}
- virtual ~ClientBootstrap() = default;
+ virtual ~ClientBootstrap() {
+ if (base_ && group_.get() && pipeline_) {
+ // Pipeline was already created, destroy from the EventBase thread
+ // as AsyncSocket destruct could only be called from its EventBase thread
+ VLOG(5) << " ~ClientBootstrap " << this
+ << ", destruct pipeline_ in EventBase thread";
+ base_->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
+ pipeline_.reset();
+ });
+ }
+ };
protected:
typename Pipeline::Ptr pipeline_;
@@ -124,6 +164,11 @@ class ClientBootstrap {
std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
std::shared_ptr<wangle::IOThreadPoolExecutor> group_;
+ // This is for the reconnect case to use the same EventBase.
+ // So the old pipeline_ could be destroyed by the same EventBase thread.
+ // TODO is there any way to get EventBase from pipeline_?
+ // AsyncSocket has getEventBase() func.
+ folly::EventBase* base_{nullptr};
folly::SSLContextPtr sslContext_;
SSL_SESSION* sslSession_{nullptr};
};