diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..3b7fecf --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "libs/libevpp"] + path = libs/libevpp + url = https://github.com/hamidr/libevpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b5bd49d..54e9dce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,26 +17,21 @@ endif() set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_BINARY_DIR ${CMAKE_SOURCE_DIR}/build) -set(PROJECT_SOURCE_DIR ${CMAKE_SOURCE_DIR}/src) -set(PROJECT_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/includes) + +set(PROJECT_SOURCE_DIR src) +set(PROJECT_INCLUDE_DIR includes) include_directories(${PROJECT_INCLUDE_DIR}) -include_directories("/usr/include/") -include_directories("/usr/local/include") +include_directories(/usr/include/) +include_directories(/usr/local/include) -link_directories("/usr/lib") -link_directories("/usr/local/lib") +link_directories(/usr/lib) +link_directories(/usr/local/lib) +add_subdirectory(libs/libevpp/) -add_library(event_loop - ${PROJECT_SOURCE_DIR}/event_loop/socket_watcher.cpp - ${PROJECT_SOURCE_DIR}/event_loop/event_loop_ev.cpp) +include_directories(libs/libevpp/includes) -add_library(network - ${PROJECT_SOURCE_DIR}/network/async_socket.cpp - ${PROJECT_SOURCE_DIR}/network/unix_socket.cpp - ${PROJECT_SOURCE_DIR}/network/tcp_socket.cpp) add_library(parser ${PROJECT_SOURCE_DIR}/parser/base_resp_parser.cpp @@ -59,19 +54,7 @@ if(CMAKE_COMPILER_IS_GNUCXX) set(CMAKE_EXE_LINKER_FLAGS "-s") ## Strip binary endif() -target_link_libraries(event_loop ev) -target_link_libraries(async_redis event_loop parser network) - -install(TARGETS event_loop - LIBRARY DESTINATION /usr/local/lib/ - ARCHIVE DESTINATION /usr/local/lib/) - -install(TARGETS parser - LIBRARY DESTINATION /usr/local/lib/ - ARCHIVE DESTINATION /usr/local/lib/ - ) - -install(DIRECTORY ${PROJECT_INCLUDE_DIR}/ DESTINATION /usr/local/include) +target_link_libraries(async_redis network parser) -add_executable (a1.out ${CMAKE_SOURCE_DIR}/test/main.cpp) -target_link_libraries(a1.out async_redis) +add_executable (a2.out test/main.cpp) +target_link_libraries(a2.out async_redis) diff --git a/examples/tcp_server.hpp b/examples/tcp_server.hpp index 77d6805..56a18df 100644 --- a/examples/tcp_server.hpp +++ b/examples/tcp_server.hpp @@ -8,13 +8,13 @@ #include namespace async_redis { -namespace tcp_server { +namespace examples { class tcp_server { public: - using tcp_socket = async_redis::network::tcp_socket; - using async_socket = async_redis::network::async_socket; + using tcp_socket = libevpp::network::tcp_socket; + using async_socket = libevpp::network::async_socket; tcp_server(event_loop::event_loop_ev &event_loop) : loop_(event_loop), listener_(event_loop) { diff --git a/includes/connection.hpp b/includes/async_redis/connection.hpp similarity index 77% rename from includes/connection.hpp rename to includes/async_redis/connection.hpp index f284d2c..772ea2b 100644 --- a/includes/connection.hpp +++ b/includes/async_redis/connection.hpp @@ -5,18 +5,21 @@ #include #include -#include -#include +#include +#include +#include + +using namespace libevpp; namespace async_redis { class connection { - using async_socket = network::async_socket; + using async_socket = libevpp::network::async_socket; public: using parser_t = parser::base_resp_parser::parser; - using reply_cb_t = std::function; + using reply_cb_t = std::function; connection(event_loop::event_loop_ev& event_loop); diff --git a/includes/monitor.hpp b/includes/async_redis/monitor.hpp similarity index 93% rename from includes/monitor.hpp rename to includes/async_redis/monitor.hpp index 23e9844..8868ff5 100644 --- a/includes/monitor.hpp +++ b/includes/async_redis/monitor.hpp @@ -1,13 +1,14 @@ #pragma once -#include -#include +#include +#include #include #include #include using std::string; +using namespace libevpp; namespace async_redis { @@ -24,7 +25,7 @@ namespace async_redis }; using parser_t = parser::base_resp_parser::parser; - using watcher_cb_t = std::function; + using watcher_cb_t = std::function; monitor(event_loop::event_loop_ev &event_loop); diff --git a/includes/parser/array_parser.h b/includes/async_redis/parser/array_parser.h similarity index 100% rename from includes/parser/array_parser.h rename to includes/async_redis/parser/array_parser.h diff --git a/includes/parser/base_resp_parser.h b/includes/async_redis/parser/base_resp_parser.h similarity index 83% rename from includes/parser/base_resp_parser.h rename to includes/async_redis/parser/base_resp_parser.h index 5394d01..572d88b 100644 --- a/includes/parser/base_resp_parser.h +++ b/includes/async_redis/parser/base_resp_parser.h @@ -30,6 +30,11 @@ namespace async_redis { virtual int parse_append(const char*, ssize_t, bool&) = 0; virtual std::string to_string() const = 0; virtual void map(const caller_t &fn); + bool is_array() const; + bool is_number() const; + bool is_string() const; + bool is_enum() const; + bool is_error() const; void print(); }; diff --git a/includes/parser/bulk_string_parser.h b/includes/async_redis/parser/bulk_string_parser.h similarity index 100% rename from includes/parser/bulk_string_parser.h rename to includes/async_redis/parser/bulk_string_parser.h diff --git a/includes/parser/error_parser.h b/includes/async_redis/parser/error_parser.h similarity index 100% rename from includes/parser/error_parser.h rename to includes/async_redis/parser/error_parser.h diff --git a/includes/parser/number_parser.h b/includes/async_redis/parser/number_parser.h similarity index 100% rename from includes/parser/number_parser.h rename to includes/async_redis/parser/number_parser.h diff --git a/includes/parser/simple_string_parser.h b/includes/async_redis/parser/simple_string_parser.h similarity index 100% rename from includes/parser/simple_string_parser.h rename to includes/async_redis/parser/simple_string_parser.h diff --git a/includes/redis_client.hpp b/includes/async_redis/redis_client.hpp similarity index 94% rename from includes/redis_client.hpp rename to includes/async_redis/redis_client.hpp index fd68805..aa5324a 100644 --- a/includes/redis_client.hpp +++ b/includes/async_redis/redis_client.hpp @@ -4,7 +4,7 @@ #include #include -#include "connection.hpp" +#include namespace async_redis { @@ -38,6 +38,7 @@ namespace async_redis void decr(const string& field, reply_cb_t reply); void ping(reply_cb_t reply); void publish(const string& channel, const string& msg, reply_cb_t&& reply); + void sort(const string& hash_name, std::vector&& fields, reply_cb_t&& reply); //TODO: wtf?! doesnt make sense with multiple connections! // void select(uint catalog, reply_cb_t&& reply) { diff --git a/includes/sentinel.hpp b/includes/async_redis/sentinel.hpp similarity index 84% rename from includes/sentinel.hpp rename to includes/async_redis/sentinel.hpp index 487efb7..cbfa04d 100644 --- a/includes/sentinel.hpp +++ b/includes/async_redis/sentinel.hpp @@ -1,16 +1,17 @@ #pragma once -#include -#include +#include +#include -#include +#include #include -#include -// #include +#include + +using namespace libevpp; namespace async_redis { class sentinel { - using socket_t = ::async_redis::network::async_socket; + using socket_t = libevpp::network::async_socket; using connect_cb_t = socket_t::connect_handler_t; public: @@ -54,7 +55,7 @@ namespace async_redis { private: int connected_ = 0; - std::unique_ptr stream_; - std::unique_ptr conn_; + monitor stream_; + connection conn_; }; } diff --git a/includes/event_loop/event_loop_ev.h b/includes/event_loop/event_loop_ev.h deleted file mode 100644 index cbe8284..0000000 --- a/includes/event_loop/event_loop_ev.h +++ /dev/null @@ -1,48 +0,0 @@ -#pragma once - -#include -#include - -namespace async_redis { - namespace event_loop - { - class event_loop_ev - { - public: - using socket_identifier_t = std::shared_ptr; - - private: - struct timer_watcher - { - ev_timer timer; - action timeout_cb; - - timer_watcher(double time, const action& cb) - : timeout_cb(cb) - { - ev_timer_init (&timer, &event_loop_ev::timer_handler, time, 0.); - } - }; - - - public: - event_loop_ev(); - event_loop_ev(struct ev_loop *); - - void run(); - - socket_identifier_t watch(int); - void unwatch(socket_identifier_t&); - - void async_write(socket_identifier_t& id, action&& cb); - void async_read(socket_identifier_t& id, action&& cb); - void async_timeout(double time, action&& cb ); - - private: - static void timer_handler(EV_P_ ev_timer* w, int revents); - - private: - struct ev_loop* loop_; - }; - } -} diff --git a/includes/event_loop/socket_watcher.h b/includes/event_loop/socket_watcher.h deleted file mode 100644 index 305a4a6..0000000 --- a/includes/event_loop/socket_watcher.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace async_redis { -namespace event_loop { - -typedef std::function action; - -class socket_watcher -{ - public: - socket_watcher(struct ev_loop* loop, int fd); - - void stop(); - - void start_reading_with(action&& cb); - void start_writing_with(action&& cb); - - private: - void call_read(); - void call_write(); - - static void read_handler(EV_P_ ev_io* w, int revents); - static void write_handler(EV_P_ ev_io* w, int revents); - - private: - void start_reading(); - void start_writing(); - - void stop_reading(); - void stop_writing(); - - private: - struct ev_loop* loop_; - - ev_io write_watcher_; - ev_io read_watcher_; - - std::queue write_handlers_; - std::queue read_handlers_; -}; - -} -} diff --git a/includes/network/async_socket.hpp b/includes/network/async_socket.hpp deleted file mode 100644 index 2b47a36..0000000 --- a/includes/network/async_socket.hpp +++ /dev/null @@ -1,75 +0,0 @@ -#pragma once - -#include - -#include -#include -#include - -namespace async_redis { - namespace network - { - using std::string; - - class socket_excetion : std::exception {}; - //TODO: NAMING? More of a permission socket issue than a connect one! - class connect_socket_exception : socket_excetion {}; - class nonblocking_socket_exception : socket_excetion {}; - - class async_socket - { - public: - using socket_t = struct sockaddr; - - using socket_identifier_t = event_loop::event_loop_ev::socket_identifier_t; - using recv_cb_t = std::function; - using ready_cb_t = std::function; - using connect_handler_t = std::function; - - async_socket(event_loop::event_loop_ev& io); - - ~async_socket(); - - bool is_valid(); - ssize_t send(const string& data); - ssize_t send(const char *data, size_t len); - ssize_t receive(char *data, size_t len); - bool listen(int backlog = 0); - int accept(); - bool close(); - bool async_write(const string& data, ready_cb_t cb); - bool async_read(char *buffer, int max_len, recv_cb_t cb); - void async_accept(const std::function)>& cb); - - bool is_connected() const; - - protected: - void set_fd_socket(int fd); - - template - void async_connect(int timeout, connect_handler_t handler, Args... args) - { - if (timeout == 10) // is equal to 1 second - return handler(false); - - io_.async_timeout(0.1, [this, timeout, args..., handler]() { - - if (-1 == static_cast(*this).connect(args...)) - return this->async_connect(timeout+1, handler, args...); - - handler(is_connected()); - }); - } - - void create_socket(int domain); - int connect_to(socket_t* socket_addr, int len); - int bind_to(socket_t* socket_addr, int len); - - private: - bool is_connected_ = false; - event_loop::event_loop_ev& io_; - socket_identifier_t id_; - int fd_ = -1; - }; - } -} diff --git a/includes/network/tcp_socket.hpp b/includes/network/tcp_socket.hpp deleted file mode 100644 index 0315783..0000000 --- a/includes/network/tcp_socket.hpp +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once - -#include "async_socket.hpp" - -namespace async_redis { - namespace network - { - class tcp_socket : public async_socket - { - public: - tcp_socket(event_loop::event_loop_ev& io); - - void async_connect(const string& ip, int port, connect_handler_t handler); - bool bind(const string& host, int port); - int connect(const string& host, int port); - }; - } -} diff --git a/includes/network/unix_socket.hpp b/includes/network/unix_socket.hpp deleted file mode 100644 index 82466cb..0000000 --- a/includes/network/unix_socket.hpp +++ /dev/null @@ -1,17 +0,0 @@ -#pragma once - -#include "async_socket.hpp" - -namespace async_redis { - namespace network - { - class unix_socket : public async_socket - { - public: - unix_socket(event_loop::event_loop_ev &io); - void async_connect(const string& path, connect_handler_t handler); - int connect(const string& path); - bool bind(const string& path); - }; - } -} diff --git a/libs/libevpp b/libs/libevpp new file mode 160000 index 0000000..906c1f6 --- /dev/null +++ b/libs/libevpp @@ -0,0 +1 @@ +Subproject commit 906c1f66767da69d6fa5cd3a76458f6a5472bdda diff --git a/src/connection.cpp b/src/connection.cpp index ec5c7d1..ce2c502 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -1,13 +1,13 @@ -#include "../includes/connection.hpp" +#include "../includes/async_redis/connection.hpp" #include #include #include #include -#include -#include -#include +#include +#include +#include namespace async_redis diff --git a/src/event_loop/event_loop_ev.cpp b/src/event_loop/event_loop_ev.cpp deleted file mode 100644 index a423c88..0000000 --- a/src/event_loop/event_loop_ev.cpp +++ /dev/null @@ -1,55 +0,0 @@ -#include "../../includes/event_loop/event_loop_ev.h" - -namespace async_redis { -namespace event_loop { - -event_loop_ev::event_loop_ev() - : loop_(EV_DEFAULT) -{ -} - -event_loop_ev::event_loop_ev(struct ev_loop* loop) - : loop_(loop) -{ -} - -void event_loop_ev::run() -{ - ev_run (loop_, 0); -} - -void event_loop_ev::async_write(socket_identifier_t& watcher, action&& cb) -{ - watcher->start_writing_with(std::move(cb)); -} - -void event_loop_ev::async_read(socket_identifier_t& watcher, action&& cb) -{ - watcher->start_reading_with(std::move(cb)); -} - -void event_loop_ev::async_timeout(double time, action&& cb ) -{ - timer_watcher *w = new timer_watcher(time, cb); - ev_timer_start (loop_, &w->timer); -} - -void event_loop_ev::timer_handler(EV_P_ ev_timer* w, int revents) -{ - timer_watcher *watcher = reinterpret_cast(w); - watcher->timeout_cb(); - delete watcher; -} - - -event_loop_ev::socket_identifier_t event_loop_ev::watch(int fd) -{ - return std::make_shared(loop_, fd); -} - -void event_loop_ev::unwatch(socket_identifier_t& id) -{ - id->stop(); -} - -}} diff --git a/src/event_loop/socket_watcher.cpp b/src/event_loop/socket_watcher.cpp deleted file mode 100644 index 8081f37..0000000 --- a/src/event_loop/socket_watcher.cpp +++ /dev/null @@ -1,111 +0,0 @@ -#include "../../includes/event_loop/socket_watcher.h" - -namespace async_redis { -namespace event_loop { - -socket_watcher::socket_watcher(struct ev_loop* loop, int fd) - : loop_(loop) -{ - ev_io_init(&read_watcher_, &socket_watcher::read_handler, fd, EV_READ); - ev_io_init(&write_watcher_, &socket_watcher::write_handler, fd, EV_WRITE); - - write_watcher_.data = this; - read_watcher_.data = this; -} - - void socket_watcher::start_writing_with(action&& cb) -{ - write_handlers_.push(std::move(cb)); - - if (write_handlers_.size() != 1) - return; - - start_writing(); -} - -void socket_watcher::start_reading_with(action&& cb) -{ - read_handlers_.push(std::move(cb)); - - if (read_handlers_.size() != 1) - return; - - start_reading(); -} - -void socket_watcher::call_read() -{ - if (read_handlers_.size()) - { - auto &action = read_handlers_.front(); - action(); - read_handlers_.pop(); - } - - if (!read_handlers_.size()) - stop_reading(); -} - - -void socket_watcher::call_write() -{ - if (write_handlers_.size()) - { - auto &action = write_handlers_.front(); - action(); - write_handlers_.pop(); - } - - if (!write_handlers_.size()) - stop_writing(); -} - -void socket_watcher::write_handler(EV_P_ ev_io* w, int revents) -{ - if (revents & EV_ERROR) - return; - - socket_watcher* sq = reinterpret_cast(w->data); - sq->call_write(); -} - -void socket_watcher::read_handler(EV_P_ ev_io* w, int revents) -{ - if (revents & EV_ERROR) - return; - - socket_watcher* sq = reinterpret_cast(w->data); - sq->call_read(); -} - - -void socket_watcher::stop_reading() -{ - ev_clear_pending(loop_, &read_watcher_); - ev_io_stop(loop_, &read_watcher_); -} - -void socket_watcher::stop_writing() -{ - ev_clear_pending(loop_, &write_watcher_); - ev_io_stop(loop_, &write_watcher_); -} - -void socket_watcher::start_writing() -{ - ev_io_start(loop_, &write_watcher_); -} - -void socket_watcher::start_reading() -{ - ev_io_start(loop_, &read_watcher_); -} - -void socket_watcher::stop() -{ - stop_writing(); - stop_reading(); -} - -} -} diff --git a/src/monitor.cpp b/src/monitor.cpp index a49f39f..cf9e347 100644 --- a/src/monitor.cpp +++ b/src/monitor.cpp @@ -1,9 +1,9 @@ -#include "../includes/monitor.hpp" +#include "../includes/async_redis/monitor.hpp" -#include +#include #include -#include "../includes/network/tcp_socket.hpp" -#include "../includes/network/unix_socket.hpp" +#include +#include namespace async_redis { @@ -206,12 +206,13 @@ void monitor::report_disconnect() t2.swap(pwatchers_); string str; + parser_t p; for(auto &w : t1) - w.second(str, nullptr, EventState::Disconnected); + w.second(str, p, EventState::Disconnected); for(auto &w : t2) - w.second(str, nullptr, EventState::Disconnected); + w.second(str, p, EventState::Disconnected); disconnect(); } diff --git a/src/network/async_socket.cpp b/src/network/async_socket.cpp deleted file mode 100644 index a1792f7..0000000 --- a/src/network/async_socket.cpp +++ /dev/null @@ -1,147 +0,0 @@ -#include "../../includes/network/async_socket.hpp" - -#include // fcntl -#include // close - -namespace async_redis { -namespace network { - -async_socket::async_socket(event_loop::event_loop_ev& io) - : io_(io) -{ } - -async_socket::~async_socket() { - close(); -} - -bool async_socket::is_valid() { - return fd_ != -1; -} - -ssize_t async_socket::send(const string& data) { - return send(data.data(), data.size()); -} - -ssize_t async_socket::send(const char *data, size_t len) { - return ::send(fd_, data, len, 0); -} - -ssize_t async_socket::receive(char *data, size_t len) { - return ::recv(fd_, data, len, 0); -} - -bool async_socket::listen(int backlog) { - return ::listen(fd_, backlog) == 0; -} - -int async_socket::accept() { - return ::accept(fd_, nullptr, nullptr); -} - -bool async_socket::close() -{ - if (!is_connected_) - return true; - - if(id_) - io_.unwatch(id_); - - auto res = ::close(fd_) == 0; - is_connected_ = false; - fd_ = -1; - return res; -} - -bool async_socket::async_write(const string& data, ready_cb_t fn) -{ - if (!is_connected() || !data.size()) - return false; - - io_.async_write(id_, [this, data, cb{std::move(fn)}]() -> void { - auto sent_chunk = send(data); - - if(sent_chunk == 0) - close(); - - if (sent_chunk < data.size() && sent_chunk != -1) { - async_write(data.substr(sent_chunk, data.size()), std::move(cb)); - return; - } - - cb(sent_chunk); - }); - - return true; -} - -bool async_socket::async_read(char *buffer, int max_len, recv_cb_t cb) -{ - if (!is_connected()) - return false; - - io_.async_read(id_, [&, buffer, max_len, cb{std::move(cb)}]() -> void { - auto l = receive(buffer, max_len); - if (l == 0) - close(); - - cb(l); - }); - - return true; -} - -void async_socket::async_accept(const std::function)>& cb) -{ - return io_.async_read(id_, - [this, cb]() - { - int fd = this->accept(); - auto s = std::make_shared(io_); - s->set_fd_socket(fd); - cb(s); - this->async_accept(cb); - } - ); -} - -bool async_socket::is_connected() const { - return is_connected_; -} - -void async_socket::set_fd_socket(int fd) -{ - fd_ = fd; - is_connected_ = true; - - id_ = io_.watch(fd_); -} - - -void async_socket::create_socket(int domain) { - if (-1 == (fd_ = socket(domain, SOCK_STREAM, 0))) - throw connect_socket_exception(); - - if (-1 == fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL) | O_NONBLOCK)) - throw nonblocking_socket_exception(); - - id_ = io_.watch(fd_); -} - -int async_socket::connect_to(async_socket::socket_t* socket_addr, int len) -{ - int ret = ::connect(fd_, socket_addr, len); - if (!ret) - is_connected_ = true; - - return ret; -} - -int async_socket::bind_to(async_socket::socket_t* socket_addr, int len) { - int b = ::bind(fd_, socket_addr, len); - if (!b) - is_connected_ = true; - - return b; -} - -}} diff --git a/src/network/tcp_socket.cpp b/src/network/tcp_socket.cpp deleted file mode 100644 index aba11c8..0000000 --- a/src/network/tcp_socket.cpp +++ /dev/null @@ -1,43 +0,0 @@ -#include "../../includes/network/tcp_socket.hpp" - -#include - -namespace async_redis { -namespace network { - -tcp_socket::tcp_socket(event_loop::event_loop_ev& io) - : async_socket(io) -{ - this->create_socket(AF_INET); -} - -void tcp_socket::async_connect(const string& ip, int port, connect_handler_t handler) -{ - async_socket::template async_connect(0, handler, ip, port); -} - -bool tcp_socket::bind(const string& host, int port) -{ - struct sockaddr_in addr = {0}; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(host.data()); - - return this->bind_to((socket_t *)&addr, sizeof(addr)) == 0; -} - -int tcp_socket::connect(const string& host, int port) -{ - //TODO: - // setsockopt (fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)); - - struct sockaddr_in addr = {0}; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(host.data()); - - return this->connect_to((socket_t *)&addr, sizeof(addr)); -} - -} -} diff --git a/src/network/unix_socket.cpp b/src/network/unix_socket.cpp deleted file mode 100644 index ee7de36..0000000 --- a/src/network/unix_socket.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include "../../includes/network/unix_socket.hpp" - -#include -#include // close -#include - -namespace async_redis { -namespace network { - -unix_socket::unix_socket(event_loop::event_loop_ev& io) - : async_socket(io) -{ - this->create_socket(AF_UNIX); -} - -void unix_socket::async_connect(const string& path, connect_handler_t handler) -{ - async_socket::template async_connect(0, handler, path); -} - -bool unix_socket::bind(const string& path) -{ - ::unlink(path.data()); - - struct sockaddr_un addr = {0}; - addr.sun_family = AF_UNIX; - strcpy(addr.sun_path, path.data()); - - auto len = strlen(addr.sun_path) + sizeof(addr.sun_family); - - return this->bind_to((socket_t *)&addr, sizeof(addr)) == 0; -} - -int unix_socket::connect(const string& path) -{ - struct sockaddr_un addr = {0}; - addr.sun_family = AF_UNIX; - strcpy(addr.sun_path, path.data()); - auto len = strlen(addr.sun_path) + sizeof(addr.sun_family); - - return this->connect_to((socket_t *)&addr, sizeof(addr)); -} - -} -} diff --git a/src/parser/array_parser.cpp b/src/parser/array_parser.cpp index d98d2e8..f39d17f 100644 --- a/src/parser/array_parser.cpp +++ b/src/parser/array_parser.cpp @@ -1,8 +1,7 @@ -#include "../../includes/parser/array_parser.h" +#include "../../includes/async_redis/parser/array_parser.h" - -#include "../../includes/parser/number_parser.h" -#include "../../includes/parser/bulk_string_parser.h" +#include "../../includes/async_redis/parser/number_parser.h" +#include "../../includes/async_redis/parser/bulk_string_parser.h" namespace async_redis { namespace parser { diff --git a/src/parser/base_resp_parser.cpp b/src/parser/base_resp_parser.cpp index c790203..e3e1af0 100644 --- a/src/parser/base_resp_parser.cpp +++ b/src/parser/base_resp_parser.cpp @@ -1,10 +1,10 @@ -#include "../../includes/parser/base_resp_parser.h" +#include "../../includes/async_redis/parser/base_resp_parser.h" -#include "../../includes/parser/number_parser.h" -#include "../../includes/parser/bulk_string_parser.h" -#include "../../includes/parser/array_parser.h" -#include "../../includes/parser/error_parser.h" -#include "../../includes/parser/simple_string_parser.h" +#include "../../includes/async_redis/parser/number_parser.h" +#include "../../includes/async_redis/parser/bulk_string_parser.h" +#include "../../includes/async_redis/parser/array_parser.h" +#include "../../includes/async_redis/parser/error_parser.h" +#include "../../includes/async_redis/parser/simple_string_parser.h" #include @@ -51,6 +51,36 @@ base_resp_parser::append_chunk(base_resp_parser::parser& data, const char* chunk return data->parse_append(chunk, length, is_finished); } +bool +base_resp_parser::is_array() const +{ + this->type() == RespType::Arr; +} + +bool +base_resp_parser::is_number() const +{ + this->type() == RespType::Num; +} + +bool +base_resp_parser::is_error() const +{ + this->type() == RespType::Err; +} + +bool +base_resp_parser::is_string() const +{ + this->type() == RespType::BulkStr; +} + +bool +base_resp_parser::is_enum() const +{ + this->type() == RespType::Str; +} + void base_resp_parser::print() { diff --git a/src/parser/bulk_string_parser.cpp b/src/parser/bulk_string_parser.cpp index e4a8a88..96859be 100644 --- a/src/parser/bulk_string_parser.cpp +++ b/src/parser/bulk_string_parser.cpp @@ -1,4 +1,4 @@ -#include "../../includes/parser/bulk_string_parser.h" +#include "../../includes/async_redis/parser/bulk_string_parser.h" namespace async_redis { diff --git a/src/parser/error_parser.cpp b/src/parser/error_parser.cpp index 0bf0bc2..72bc58a 100644 --- a/src/parser/error_parser.cpp +++ b/src/parser/error_parser.cpp @@ -1,4 +1,4 @@ -#include "../../includes/parser/error_parser.h" +#include "../../includes/async_redis/parser/error_parser.h" #include namespace async_redis { diff --git a/src/parser/number_parser.cpp b/src/parser/number_parser.cpp index 04b0f37..5f6be96 100644 --- a/src/parser/number_parser.cpp +++ b/src/parser/number_parser.cpp @@ -1,4 +1,4 @@ -#include "../../includes/parser/number_parser.h" +#include "../../includes/async_redis/parser/number_parser.h" namespace async_redis { namespace parser { diff --git a/src/parser/simple_string_parser.cpp b/src/parser/simple_string_parser.cpp index 85c178e..fafa083 100644 --- a/src/parser/simple_string_parser.cpp +++ b/src/parser/simple_string_parser.cpp @@ -1,4 +1,4 @@ -#include "../../includes/parser/simple_string_parser.h" +#include "../../includes/async_redis/parser/simple_string_parser.h" namespace async_redis { namespace parser { diff --git a/src/redis_client.cpp b/src/redis_client.cpp index 9eaa1dc..9e918c6 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -1,11 +1,9 @@ -#include "../includes/redis_client.hpp" +#include "../includes/async_redis/redis_client.hpp" #include #include #include -#include "connection.hpp" - namespace async_redis { @@ -71,6 +69,17 @@ void redis_client::publish(const string& channel, const string& msg, reply_cb_t& send({"publish", channel, msg}, reply); } +void +redis_client::sort(const string& hash_name, std::vector&& fields, reply_cb_t&& reply) +{ + std::string req; + for (auto &field : fields) + req += "get " + field + " "; + + send({"sort " + hash_name + " by nosort", req}, reply); +} + + void redis_client::commit_pipeline() { string buffer; std::swap(pipeline_buffer_, buffer); diff --git a/src/sentinel.cpp b/src/sentinel.cpp index ba38eba..60eef0f 100644 --- a/src/sentinel.cpp +++ b/src/sentinel.cpp @@ -1,4 +1,4 @@ -#include "../includes/sentinel.hpp" +#include "../includes/async_redis/sentinel.hpp" #include #include @@ -6,13 +6,13 @@ namespace async_redis { sentinel::sentinel(event_loop::event_loop_ev &event_loop) - : conn_(std::make_unique(event_loop)), - stream_(std::make_unique(event_loop)) + : conn_(event_loop), + stream_(event_loop) { } bool sentinel::is_connected() const { - return stream_->is_connected() && conn_->is_connected(); + return stream_.is_connected() && conn_.is_connected(); } bool sentinel::connect(const string& ip, int port, connect_cb_t&& connector) @@ -25,8 +25,8 @@ bool sentinel::connect(const string& ip, int port, connect_cb_t&& connector) } void sentinel::disconnect() { - stream_->disconnect(); - conn_->disconnect(); + stream_.disconnect(); + conn_.disconnect(); } bool sentinel::failover(const string& clustername, connection::reply_cb_t&& reply) @@ -53,7 +53,7 @@ bool sentinel::watch_master_change(cb_watch_master_change_t&& fn) [&]()-> bool { using State = monitor::EventState; - return stream_->subscribe({"+switch-master"}, + return stream_.subscribe({"+switch-master"}, [this, fn = std::move(fn)](const string& channel, parser_t event, State state) -> void { switch(state) @@ -147,8 +147,8 @@ void sentinel::connect_all(const string& ip, int port, const connect_cb_t& conne { auto cb = std::bind(&sentinel::check_connected, this, connector, std::placeholders::_1); - conn_->connect(cb, ip, port); - stream_->connect(cb, ip, port); + conn_.connect(cb, ip, port); + stream_.connect(cb, ip, port); } void sentinel::check_connected(const connect_cb_t& connector, bool res) @@ -171,7 +171,7 @@ bool sentinel::send(std::list&& words, connection::reply_cb_t&& reply) cmd += " " + w; cmd += "\r\n"; - if (!conn_->send(std::move(cmd), std::move(reply))) { + if (!conn_.send(std::move(cmd), std::move(reply))) { disconnect(); return false; } diff --git a/test/main.cpp b/test/main.cpp index 07fc251..076b62b 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -1,15 +1,15 @@ -#include - -#include -#include -#include -#include -#include -#include +#include + +#include +#include +#include +#include +#include +#include #include "../examples/tcp_server.hpp" -typedef async_redis::event_loop::event_loop_ev event_loop_ev; +typedef libevpp::event_loop::event_loop_ev event_loop_ev; struct monitor_test {