-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlooper.cpp
139 lines (110 loc) · 2.82 KB
/
looper.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include <atomic>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
class CLooper {
public:
using Runnable = std::function<void()>;
class CDispatcher {
friend class CLooper;
public:
bool post(CLooper::Runnable &&aOther) {
return mAssignedLooper.post(std::move(aOther));
}
private:
CDispatcher(CLooper &aLooper) : mAssignedLooper(aLooper) {}
CLooper &mAssignedLooper;
};
CLooper()
: mDispatcher(std::shared_ptr<CDispatcher>(new CDispatcher(*this))) {}
~CLooper() { abortAndJoin(); }
void stop() { abortAndJoin(); }
bool run() {
try {
mThread = std::thread(&CLooper::runFunc, this);
} catch (...) {
return false;
}
return true;
}
bool running() { return mRunning.load(); }
Runnable next() {
std::lock_guard guard(mRunnablesMutex);
if (mRunnables.empty()) {
return nullptr;
}
Runnable runnable = mRunnables.front();
mRunnables.pop();
return runnable;
}
std::shared_ptr<CDispatcher> getDispatcher() { return mDispatcher; }
private:
void runFunc() {
mRunning.store(true);
while (false == mAbortRequested.load()) {
try {
using namespace std::chrono_literals;
Runnable r = next();
if (nullptr != r) {
r();
} else {
std::this_thread::sleep_for(1ms);
}
} catch (std::runtime_error &e) {
} catch (...) {
}
}
mRunning.store(false);
}
void abortAndJoin() {
mAbortRequested.store(true);
if (mThread.joinable()) {
mThread.join();
}
}
bool post(Runnable &&aRunnable) {
if (not running()) {
return false;
}
try {
std::lock_guard guard(mRunnablesMutex);
mRunnables.push(std::move(aRunnable));
} catch (...) {
return false;
}
return true;
}
std::thread mThread;
std::atomic_bool mRunning;
std::atomic_bool mAbortRequested;
std::recursive_mutex mRunnablesMutex;
std::queue<Runnable> mRunnables;
std::shared_ptr<CDispatcher> mDispatcher;
};
int main() {
auto looper = std::make_unique<CLooper>();
std::cout << "Starting looper" << std::endl;
looper->run();
auto dispatcher = looper->getDispatcher();
std::cout << "Adding tasks" << std::endl;
for (uint32_t k = 0; k < 500; ++k) {
auto const task = [k]() {
std::cout
<< "Invocation " << k
<< ": Hello, I have been executed asynchronously on the looper for "
<< (k + 1) << " times." << std::endl;
};
dispatcher->post(std::move(task));
}
std::cout << "Waiting 5 seconds for completion" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "Stopping looper" << std::endl;
dispatcher = nullptr;
looper->stop();
looper = nullptr;
return 0;
}