Skip to content

Commit

Permalink
Fix cyclic buffer to respect buffered data.
Browse files Browse the repository at this point in the history
  • Loading branch information
evik42 committed Oct 28, 2022
1 parent fb46cca commit dd8a3e3
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 119 deletions.
42 changes: 14 additions & 28 deletions runtime/init-container/src/cyclic_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <stdbool.h>
#include <sys/mman.h>
#include <unistd.h>
#include <stdio.h>

#include "cyclic_buffer.h"

Expand Down Expand Up @@ -56,16 +57,16 @@ static size_t min(size_t a, size_t b) {

ssize_t cyclic_buffer_read(int fd, struct cyclic_buffer* cb, size_t count) {
ssize_t got = 0;
size_t orig_cb_size = cyclic_buffer_data_size(cb);
size_t free_space = cyclic_buffer_free_size(cb);

while (count) {
while (count && free_space) {
bool fixup_end = false;
if (cb->end == cb->buf + cb->size) {
cb->end = cb->buf;
fixup_end = true;
}

size_t this_read_size = min(cb->buf + cb->size - cb->end, count);
size_t this_read_size = min(free_space, min(cb->buf + cb->size - cb->end, count));
ssize_t ret = read(fd, cb->end, this_read_size);
if (ret <= 0) {
if (fixup_end) {
Expand All @@ -89,37 +90,23 @@ ssize_t cyclic_buffer_read(int fd, struct cyclic_buffer* cb, size_t count) {
cb->end += ret;
count -= ret;
got += ret;
free_space = cyclic_buffer_free_size(cb);

if ((size_t)ret < this_read_size) {
/* Not enough data to fill the whole request. */
break;
}
}

if (got > 0) {
size_t x = orig_cb_size + got;
if (x > cb->size) {
x = cb->size;
}

if (cb->buf + x <= cb->end) {
cb->begin = cb->end - x;
} else {
cb->begin = cb->end + (cb->size - x);
}
}

return got;
}

ssize_t cyclic_buffer_write(int fd, struct cyclic_buffer* cb, size_t count) {
ssize_t wrote = 0;
size_t orig_cb_size = cyclic_buffer_data_size(cb);
size_t available_data = cyclic_buffer_data_size(cb);

count = min(count, orig_cb_size);

while (count) {
size_t this_write_size = min(cb->buf + cb->size - cb->begin, count);
while (count && available_data) {
size_t this_write_size = min(available_data, min(cb->buf + cb->size - cb->begin, count));
ssize_t ret = write(fd, cb->begin, this_write_size);
if (ret < 0) {
if (errno == EINTR) {
Expand All @@ -137,23 +124,22 @@ ssize_t cyclic_buffer_write(int fd, struct cyclic_buffer* cb, size_t count) {
}

cb->begin += ret;
if (cb->begin == cb->buf + cb->size) {
if (cb->begin == cb->end) {
// buffer is empty
cb->begin = cb->buf;
cb->end = cb->buf;
} else if (cb->begin == cb->buf + cb->size) {
cb->begin = cb->buf;
}
count -= ret;
wrote += ret;
available_data = cyclic_buffer_data_size(cb);

if ((size_t)ret < this_write_size) {
/* Short write. */
break;
}
}

if (wrote >= 0 && (size_t)wrote == orig_cb_size) {
/* The buffer is empty. */
cb->begin = cb->buf;
cb->end = cb->buf;
}

return wrote;
}
12 changes: 3 additions & 9 deletions runtime/init-container/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,7 @@ static void handle_query_output(msg_id_t msg_id) {
}
bool was_full = cyclic_buffer_free_size(&proc_desc->redirs[fd].buffer.cb) == 0;
send_response_cyclic_buffer(msg_id, &proc_desc->redirs[fd].buffer.cb, len);
if (was_full && proc_desc->redirs[fd].type != REDIRECT_FD_PIPE_CYCLIC) {
if (was_full) {
if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[0],
fd,
Expand Down Expand Up @@ -1278,12 +1278,6 @@ static void handle_output_available(struct epoll_fd_desc** epoll_fd_desc_ptr) {
size_t to_read = cyclic_buffer_free_size(cb);
bool needs_notification = cyclic_buffer_data_size(cb) == 0;

if (epoll_fd_desc->data->type == REDIRECT_FD_PIPE_CYCLIC) {
/* Since fd is marked as non-blocking, it will return EAGAIN once we
* drain all available data. */
to_read = SIZE_MAX;
}

if (to_read == 0) {
/* Buffer is full, deregister `epoll_fd_desc` untill it get's emptied. */
CHECK(del_epoll_fd_desc(epoll_fd_desc));
Expand All @@ -1302,8 +1296,8 @@ static void handle_output_available(struct epoll_fd_desc** epoll_fd_desc_ptr) {
}
} else if (ret == 0) {
/* EOF. This actually cannot happen, since if we came here, there must
* have been some output available. Maybe just print an error and die()
* here? */
* have been some output available and space in the buffer. Maybe just
* print an error and die() here? */
CHECK(del_epoll_fd_desc(epoll_fd_desc));
*epoll_fd_desc_ptr = NULL;
}
Expand Down
Loading

0 comments on commit dd8a3e3

Please sign in to comment.