Skip to content

Commit

Permalink
multiple changes
Browse files Browse the repository at this point in the history
  • Loading branch information
beef9999 committed Feb 9, 2023
1 parent e62a158 commit 6a969a5
Show file tree
Hide file tree
Showing 21 changed files with 422 additions and 244 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ Here is the article, [en](https://www.reddit.com/r/cpp/comments/zd2hx1/200_lines
1. Support coroutine local variables. Similar to the C++11 `thread_local` keyword. See [doc](doc/thread-local.md).
2. Support running on macOS platform, both Intel x86_64 and Apple M1 included.
3. Support LLVM Clang/Apple Clang/GCC compilers.
* Photon 0.3 was released on 2 Sep 2022. Except for bug fixes and improvements, a new `photon::std` namespace is added.
Developers can search for `std::thread`, `std::mutex` in their own projects, and replace them all into the equivalents of `photon::std::<xxx>`.
* Photon 0.3 was released on 2 Sep 2022. Except for bug fixes and improvements, a new `photon_std` namespace is added.
Developers can search for `std::thread`, `std::mutex` in their own projects, and replace them all into the equivalents of `photon_std::<xxx>`.
It's a quick way to transform thread-based programs to coroutine-based ones.
* Photon 0.2 was released on 28 Jul 2022. This release was mainly focused on network socket, security context and multi-vcpu support.
We re-worked the `WorkPool` so it's more friendly now to write multi-vcpu programs.
Expand Down Expand Up @@ -164,10 +164,12 @@ brew install cmake openssl
### 2. Build from source
```shell
cd PhotonLibOS
cmake -B build # On macOS, we need to add -DOPENSSL_ROOT_DIR=/path/to/openssl/
cmake -B build
cmake --build build -j
```
All the libs and executables will be saved in `build/output`.

- On macOS, we need to add `-DOPENSSL_ROOT_DIR=/path/to/openssl/`. This path is often managed by Homebrew.
- All the libs and executables will be saved in `build/output`.

### 3. Examples / Testing

Expand Down
10 changes: 10 additions & 0 deletions common/iovector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

#include <sstream>
#define protected public
#include "iovector.h"
#include "utility.h"
Expand Down Expand Up @@ -328,3 +329,12 @@ size_t iovector::push_back_more(size_t bytes)
return bytes0 - bytes;
}

void iovector::debug_print() {
std::stringstream ss;
ss << "iov sum: " << sum() << ", ";
for (auto each : *this) {
ss << "{addr: " << each.iov_base << ", len: " << each.iov_len << "}, ";
}
LOG_DEBUG(ss.str().c_str());
}

30 changes: 23 additions & 7 deletions common/iovector.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ struct iovector_view
f.iov_len -= bytes;
auto rst = f.iov_base;
(char*&)f.iov_base += bytes;
if (f.iov_len == 0)
pop_front();
return rst;
}

Expand All @@ -143,7 +145,10 @@ struct iovector_view
return nullptr;

b.iov_len -= bytes;
return (char*)b.iov_base + b.iov_len;
void* ret = (char*)b.iov_base + b.iov_len;
if (b.iov_len == 0)
pop_back();
return ret;
}

// try to `extract_back(bytes)` and copy the extracted bytes to `buf`
Expand Down Expand Up @@ -423,6 +428,8 @@ class iovector
// resize the # of bytes, by either poping-back or pushing-back
size_t truncate(size_t size)
{
if (size == sum())
return size;
auto ret = shrink_to(size);
if (ret == size)
return size;
Expand Down Expand Up @@ -487,8 +494,7 @@ class iovector
auto ret = va.extract_front(bytes, &vi);
iov_begin = iov_end - va.iovcnt;
if (ret >= 0) {
iov->iov_begin = vi.iov - iov->iovs;
iov->iov_end = iov->iov_begin + vi.iovcnt;
iov->update(vi);
}
return ret;
}
Expand All @@ -507,8 +513,10 @@ class iovector
{
auto va = view();
auto ptr = va.extract_front_continuous(bytes);
if (ptr)
if (ptr) {
update(va);
return ptr;
}

auto buf = do_malloc(bytes);
auto ret = extract_front(bytes, buf);
Expand Down Expand Up @@ -577,8 +585,7 @@ class iovector
auto ret = va.extract_back(bytes, &vi);
iov_end = iov_begin + va.iovcnt;
if (ret >= 0) {
iov->iov_begin = vi.iov - iov->iovs;
iov->iov_end = iov->iov_begin + vi.iovcnt;
iov->update(vi);
}
return ret;
}
Expand All @@ -597,8 +604,10 @@ class iovector
{
auto va = view();
auto ptr = va.extract_back_continuous(bytes);
if (ptr)
if (ptr) {
update(va);
return ptr;
}

auto buf = do_malloc(bytes);
auto ret = extract_back(bytes, buf);
Expand Down Expand Up @@ -781,6 +790,11 @@ class iovector
return iovector_view((struct iovec*)iovec(), iovcnt());
}

void update(iovector_view va) {
iov_begin = va.iov - iovs;
iov_end = iov_begin + va.iovcnt;
}

// generate iovector_view of partial data part in the iovector
// generated view shares data field of this iovector, but has
// its own iovec array.
Expand All @@ -807,6 +821,8 @@ class iovector
return allocator();
}

void debug_print();

protected:
uint16_t capacity; // total capacity
uint16_t iov_begin, iov_end; // [iov_begin, iov_end)
Expand Down
2 changes: 1 addition & 1 deletion doc/thread-local.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int main() {
abort();
DEFER(photon::fini());

auto th = photon::std::thread([]{
auto th = photon_std::thread([]{
*pI = 1;
});
th.join();
Expand Down
20 changes: 10 additions & 10 deletions examples/simple/simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ limitations under the License.
// Please refer to the README under the module directories.

static void run_socket_server(photon::net::ISocketServer* server, photon::fs::IFile* file, AlignedAlloc& alloc,
photon::std::condition_variable& cv, photon::std::mutex& mu, bool& got_msg);
photon_std::condition_variable& cv, photon_std::mutex& mu, bool& got_msg);

int main() {
// Initialize Photon environment in current vcpu.
Expand Down Expand Up @@ -71,27 +71,27 @@ int main() {
DEFER(delete server);

// Photon's std is equivalent to the standard std, but specially working for coroutines
photon::std::mutex mu;
photon::std::condition_variable cv;
photon_std::mutex mu;
photon_std::condition_variable cv;
bool got_msg = false;
AlignedAlloc alloc(512);

// So the thread is actually a coroutine. Photon threads run on top of vcpu(native OS threads).
// We create a Photon thread to run socket server. Pass some local variables to the new thread as arguments.
auto server_thread = photon::std::thread(run_socket_server, server, file, alloc, cv, mu, got_msg);
auto server_thread = photon_std::thread(run_socket_server, server, file, alloc, cv, mu, got_msg);

// Create a watcher thread to wait the go_msg flag
auto watcher_thread = photon::std::thread([&] {
auto watcher_thread = photon_std::thread([&] {
LOG_INFO("Start to watch message");
photon::std::unique_lock<photon::std::mutex> lock(mu);
photon_std::unique_lock<photon_std::mutex> lock(mu);
while (!got_msg) {
cv.wait(lock);
}
LOG_INFO("Got message!");
});

// Wait server to be ready to accept
photon::std::this_thread::sleep_for(std::chrono::seconds(1));
photon_std::this_thread::sleep_for(std::chrono::seconds(1));

// Create socket client and connect
auto client = photon::net::new_tcp_socket_client();
Expand All @@ -116,7 +116,7 @@ int main() {
delete stream;

// Wait for a while and shutdown the server
photon::std::this_thread::sleep_for(std::chrono::seconds(1));
photon_std::this_thread::sleep_for(std::chrono::seconds(1));
server->terminate();

// Join other threads
Expand All @@ -125,7 +125,7 @@ int main() {
}

void run_socket_server(photon::net::ISocketServer* server, photon::fs::IFile* file, AlignedAlloc& alloc,
photon::std::condition_variable& cv, photon::std::mutex& mu, bool& got_msg) {
photon_std::condition_variable& cv, photon_std::mutex& mu, bool& got_msg) {
void* buf = alloc.alloc(1024);
DEFER(alloc.dealloc(buf));

Expand All @@ -151,7 +151,7 @@ void run_socket_server(photon::net::ISocketServer* server, photon::fs::IFile* fi

// Got message. Notify the watcher
{
photon::std::lock_guard<photon::std::mutex> lock(mu);
photon_std::lock_guard<photon_std::mutex> lock(mu);
got_msg = true;
cv.notify_one();
}
Expand Down
14 changes: 7 additions & 7 deletions io/events_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ template <EVENT_TYPE EV_READ_, EVENT_TYPE EV_WRITE_, EVENT_TYPE EV_ERROR_>
struct EVGroupBase {
static constexpr EVENT_TYPE EV_READ = EV_READ_;
static constexpr EVENT_TYPE EV_WRITE = EV_WRITE_;
static constexpr EVENT_TYPE EV_ERROR = EV_ERROR_;
static constexpr EVENT_TYPE EV_ERR = EV_ERROR_;

static_assert(EV_READ != EV_WRITE, "...");
static_assert(EV_READ != EV_ERROR, "...");
static_assert(EV_ERROR != EV_WRITE, "...");
static_assert(EV_READ != EV_ERR, "...");
static_assert(EV_ERR != EV_WRITE, "...");
static_assert(EV_READ, "...");
static_assert(EV_WRITE, "...");
static_assert(EV_ERROR, "...");
static_assert(EV_ERR, "...");
};

struct EVUBase {};
Expand All @@ -39,20 +39,20 @@ struct EventsMap {

static constexpr EVENT_TYPE UNDERLAY_EVENT_READ = EV_UNDERLAY::EV_READ;
static constexpr EVENT_TYPE UNDERLAY_EVENT_WRITE = EV_UNDERLAY::EV_WRITE;
static constexpr EVENT_TYPE UNDERLAY_EVENT_ERROR = EV_UNDERLAY::EV_ERROR;
static constexpr EVENT_TYPE UNDERLAY_EVENT_ERROR = EV_UNDERLAY::EV_ERR;

EVENT_TYPE translate_bitwisely(EVENT_TYPE events) const {
EVENT_TYPE ret = 0;
if (events & EV_KEY::EV_READ) ret |= EV_UNDERLAY::EV_READ;
if (events & EV_KEY::EV_WRITE) ret |= EV_UNDERLAY::EV_WRITE;
if (events & EV_KEY::EV_ERROR) ret |= EV_UNDERLAY::EV_ERROR;
if (events & EV_KEY::EV_ERR) ret |= EV_UNDERLAY::EV_ERR;
return ret;
}

EVENT_TYPE translate_byval(EVENT_TYPE event) const {
if (event == EV_KEY::EV_READ) return EV_UNDERLAY::EV_READ;
if (event == EV_KEY::EV_WRITE) return EV_UNDERLAY::EV_WRITE;
if (event == EV_KEY::EV_ERROR) return EV_UNDERLAY::EV_ERROR;
if (event == EV_KEY::EV_ERR) return EV_UNDERLAY::EV_ERR;
}
};

Expand Down
1 change: 0 additions & 1 deletion net/http/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ limitations under the License.
namespace photon {
namespace net {
namespace http {
static const uint64_t kMinimalStreamLife = 300UL * 1000 * 1000;
static const uint64_t kDNSCacheLife = 3600UL * 1000 * 1000;
static constexpr char USERAGENT[] = "EASE/0.21.6";

Expand Down
2 changes: 1 addition & 1 deletion net/http/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ ssize_t Message::resource_size() const {
return estring_view(ret).to_uint64();
}

uint64_t Message::body_size() const {
size_t Message::body_size() const {
if (m_verb == Verb::HEAD) return 0;
auto it = headers.find("Content-Length");
if (it != headers.end()) return estring_view(it.second()).to_uint64();
Expand Down
2 changes: 1 addition & 1 deletion net/http/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class Message : public IStream {
int append_bytes(uint16_t size);

int skip_remain();
int close() { return 0; }
int close() override { return 0; }

std::string_view partial_body() const {
return std::string_view{m_buf, m_buf_size} | m_body;
Expand Down
3 changes: 1 addition & 2 deletions net/http/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
#include <string>
#include <fcntl.h>
#include <vector>
#include <sys/stat.h>
#include <photon/net/socket.h>
#include <photon/common/alog-stdstring.h>
#include <photon/common/estring.h>
Expand Down Expand Up @@ -171,8 +172,6 @@ class HTTPServerImpl : public HTTPServer {
};


constexpr static uint64_t KminFileLife = 30 * 1000UL * 1000UL;

class FsHandler : public HTTPHandler {
public:
fs::IFileSystem* m_fs;
Expand Down
1 change: 0 additions & 1 deletion net/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ bool zerocopy_available() {

static const char base64_index_min = '+';
static const char base64_index_max = 'z';
static const unsigned char base64_index_count = 80; //= '+' - 'z' + 1
#define EI 255
static unsigned char base64_index_map[]= {
//'+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '<', '=', '>', '?', '@',
Expand Down
30 changes: 14 additions & 16 deletions rpc/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ limitations under the License.
using namespace std;

namespace photon {
namespace rpc
{
namespace rpc {

class StubImpl : public Stub
{
public:
Expand Down Expand Up @@ -116,6 +116,9 @@ namespace rpc
auto args = (OooArgs*)args_;
args->response->truncate(m_header.size);
auto iov = args->response;
if (iov->iovcnt() == 0) {
iov->malloc(m_header.size);
}
auto ret = m_stream->readv((const iovec*)iov->iovec(), iov->iovcnt());
// return 0 means it has been disconnected
// should take as fault
Expand Down Expand Up @@ -147,32 +150,27 @@ namespace rpc
do_collect.bind(stub, &StubImpl::do_recv_body);
}
};
virtual int do_call(FunctionID function, SerializerIOV& req_iov, SerializerIOV& resp_iov, uint64_t timeout) override
{
if (resp_iov.iovfull) {
LOG_ERRNO_RETURN(ENOBUFS, -1, "RPC: response iov is full")
}
auto request = &req_iov.iov;
auto response = &resp_iov.iov;

int do_call(FunctionID function, iovector* request, iovector* response, uint64_t timeout) override {
scoped_rwlock rl(m_rwlock, photon::RLOCK);
Timeout tmo(timeout);
// m_sem.wait(1);
// DEFER(m_sem.signal(1));
if (tmo.expire() < photon::now) {
LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timed out before rpc start", VALUE(timeout), VALUE(tmo.timeout()));
}
int ret = 0;
OooArgs args(this, function, request, response, tmo.timeout());
ret = ooo_issue_operation(args);
if (ret < 0) {
ERRNO err;
LOG_ERRNO_RETURN((err.no == ECONNRESET) ? ECONNRESET : EFAULT, -1, "failed to send request");
if (errno != ECONNRESET)
errno = EFAULT;
LOG_ERRNO_RETURN(0, -1, "failed to send request");
}
ret = ooo_wait_completion(args);
if (ret < 0) {
ERRNO err;
LOG_ERRNO_RETURN((err.no == ECONNRESET) ? ECONNRESET : EFAULT, -1, "failed to receive response ");
} else if (ret > (int) resp_iov.iov.sum()) {
if (errno != ECONNRESET)
errno = EFAULT;
LOG_ERRNO_RETURN(0, -1, "failed to receive response ");
} else if (ret > (int) response->sum()) {
LOG_ERROR_RETURN(0, -1, "RPC: response iov buffer is too small");
}

Expand Down
Loading

0 comments on commit 6a969a5

Please sign in to comment.