Skip to content

Commit

Permalink
enhance error handling for redis
Browse files Browse the repository at this point in the history
Signed-off-by: liulanzheng <[email protected]>
  • Loading branch information
liulanzheng committed Jan 17, 2025
1 parent eae4779 commit 0c3544d
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 25 deletions.
25 changes: 15 additions & 10 deletions ecosystem/redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ any _RedisClient::parse_response_item() {
auto x = get_integer();
return array_header{x};}
default:
LOG_ERROR("uncognized mark: ", mark);
return {};
LOG_ERROR("unrecognized mark: ", mark);
return error_message("unrecognized mark");
}
}

void _RedisClient::__refill(size_t atleast) {
ssize_t _RedisClient::__refill(size_t atleast) {
size_t room = _bufsize - _j;
if (!room || room < atleast) { if (_refcnt > 0) {
LOG_ERROR_RETURN(0, , "no enough buffer space");
LOG_ERROR_RETURN(0, -1 , "no enough buffer space");
} else {
size_t available = _j - _i;
memmove(ibuf(), ibuf() + _i, available);
Expand All @@ -56,8 +56,9 @@ void _RedisClient::__refill(size_t atleast) {
} }
ssize_t ret = _s->recv_at_least(ibuf() + _j, room, atleast);
if (ret < (ssize_t)atleast)
LOG_ERRNO_RETURN(0,, "failed to recv at least ` bytes", atleast);
LOG_ERRNO_RETURN(0, -1, "failed to recv at least ` bytes", atleast);
_j += ret;
return ret;
}

std::string_view _RedisClient::__getline() {
Expand All @@ -66,7 +67,9 @@ std::string_view _RedisClient::__getline() {
estring_view sv(ibuf() + _i, _j - _i);
while ((pos = sv.find('\n')) == sv.npos) {
size_t j = _j;
__refill(0);
if (__refill(0) < 0) {
return {};
}
assert(_j > j);
sv = {ibuf() + j, (uint32_t)(_j - j)};
}
Expand All @@ -88,8 +91,9 @@ std::string_view _RedisClient::__getline() {
std::string_view _RedisClient::__getstring(size_t length) {
assert(_i <= _j);
size_t available = _j - _i;
if (available < length + 2)
__refill(length + 2 - available);
if (available < length + 2 && __refill(length + 2 - available) < 0) {
return {};
}
auto begin = ibuf() + _i;
_i += length;
assert(_i + 2 <= _j);
Expand All @@ -101,7 +105,7 @@ std::string_view _RedisClient::__getstring(size_t length) {
return {begin, length};
}

void _RedisClient::flush(const void* extra_buffer, size_t size) {
ssize_t _RedisClient::flush(const void* extra_buffer, size_t size) {
iovec iov[2];
iov[0] = {obuf(), _o};
int iovcnt = 1;
Expand All @@ -112,8 +116,9 @@ void _RedisClient::flush(const void* extra_buffer, size_t size) {
}
ssize_t ret = _s->writev_mutable(iov, iovcnt);
if (ret < sum)
LOG_ERRNO_RETURN(0,, "failed to write to socket stream");
LOG_ERRNO_RETURN(0, -1, "failed to write to socket stream");
_o = 0;
return ret;
}

}
Expand Down
32 changes: 21 additions & 11 deletions ecosystem/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ using net::ISocketStream;
class _RedisClient {
protected:
ISocketStream* _s = nullptr;
bool _s_ownership = false;
uint32_t _i = 0, _j = 0, _o = 0, _bufsize = 0, _refcnt = 0;
char _xbuf[0];

Expand All @@ -196,11 +197,15 @@ class _RedisClient {
size_t __MAX_SIZE(_array_header x) { return 32; }
size_t __MAX_SIZE(_char x) { return 1; }

explicit _RedisClient(ISocketStream* s, uint32_t bufsize) :
_s(s), _bufsize(bufsize) { }
explicit _RedisClient(ISocketStream* s, bool s_ownership, uint32_t bufsize) :
_s(s), _s_ownership(s_ownership), _bufsize(bufsize) { }
~_RedisClient() {
if (_s_ownership)
delete _s;
}

public:
void flush(const void* extra_buffer = 0, size_t size = 0);
ssize_t flush(const void* extra_buffer = 0, size_t size = 0);
bool flush_if_low_space(size_t threshold, const void* ebuf = 0, size_t size = 0) {
return (_o + threshold < _bufsize) ? false :
(flush(ebuf, size), true);
Expand Down Expand Up @@ -272,7 +277,9 @@ class _RedisClient {
}

char get_char() {
ensure_input_data(__MAX_SIZE(_char{'c'}));
if (ensure_input_data(__MAX_SIZE(_char{'c'})) < 0) {
return '\0';
}
return ibuf()[_i++];
}
refstring getline() {
Expand All @@ -290,12 +297,14 @@ class _RedisClient {
return getstring((size_t)length);
return {};
}
void __refill(size_t atleast); // refill input buffer
void ensure_input_data(size_t min_available) {
ssize_t __refill(size_t atleast); // refill input buffer
int ensure_input_data(size_t min_available) {
assert(_j >= _i);
size_t available = _j - _i;
if (available < min_available)
__refill(min_available - available);
if (available < min_available && __refill(min_available - available) < 0) {
return -1;
}
return 0;
}

void write_items() { }
Expand All @@ -318,7 +327,9 @@ class _RedisClient {
template<typename...Args>
any execute(bulk_string cmd, const Args&...args) {
send_cmd_no_flush(cmd, args...);
flush();
if (flush() < 0) {
return error_message("flush failed");
}
return parse_response_item();
}

Expand Down Expand Up @@ -703,7 +714,7 @@ template<uint32_t BUF_SIZE = 16*1024UL>
class __RedisClient : public _RedisClient {
char _buf[BUF_SIZE * 2];
public:
__RedisClient(ISocketStream* s) : _RedisClient(s, BUF_SIZE) { }
__RedisClient(ISocketStream* s, bool s_ownership) : _RedisClient(s, s_ownership, BUF_SIZE) { }
};

using RedisClient = __RedisClient<16*1024UL>;
Expand All @@ -713,4 +724,3 @@ using RedisClient = __RedisClient<16*1024UL>;

}
}

8 changes: 4 additions & 4 deletions ecosystem/test/test_redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct __RC : public RedisClient {
TEST(redis, serialization) {
auto s = new_string_socket_stream();
DEFER(delete s);
RedisClient rc(s);
RedisClient rc(s, false);
rc << "asldkfjasfkd"
<< __RC::_strint{234}
<< "this-is-another-string"
Expand Down Expand Up @@ -75,7 +75,7 @@ TEST(redis, deserialization) {
ARRAY_HEADER(3) SSTR(asdf) INTEGER(75) BSTR(3,jkl) INTEGER(-1234234);
s->set_input(RESP, false);
print_resp(s->input());
RedisClient rc(s);
RedisClient rc(s, false);
auto a = rc.parse_response_item();
EXPECT_EQ(a.mark, simple_string::mark());
EXPECT_EQ(a.get<simple_string>(), "asldkfjasfkd");
Expand Down Expand Up @@ -118,7 +118,7 @@ void asdfjkl(RedisClient& bs) {
TEST(redis, cmd_serialization) {
auto s = new_string_socket_stream();
DEFER(delete s);
RedisClient rc(s);
RedisClient rc(s, false);
#define ERRMSG "ERR unknown command 'asdf'"
#define TEST_CMD(cmd, truth) { \
s->set_input("-" ERRMSG CRLF, false); \
Expand Down Expand Up @@ -191,7 +191,7 @@ TEST(redis, cmd) {
#endif
}
DEFER(delete s);
RedisClient rc(s);
RedisClient rc(s, false);
const char key[] = "zvxbhm";
rc.DEL(key);
DEFER(rc.DEL(key));
Expand Down

0 comments on commit 0c3544d

Please sign in to comment.