Skip to content

Commit

Permalink
Merge pull request #142 from golemfactory/am/fix-stdout-fragmentation
Browse files Browse the repository at this point in the history
Fix stdout fragmentation
  • Loading branch information
mfranciszkiewicz authored Oct 28, 2022
2 parents e38f167 + 6d3378f commit 725734a
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 168 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/target
*.swp
*.o
*.d
/.idea/
/.vscode/
18 changes: 12 additions & 6 deletions runtime/init-container/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CC := musl-gcc
CXX := /bin/false
CFLAGS := -std=c11 -O2 -Wall -Wextra -Werror -fPIE -pie -Iinclude/
# -MMD to create dependency files (*.d) on first compilation
CFLAGS := -MMD -std=c11 -O2 -Wall -Wextra -Werror -fPIE -pie -Iinclude/

ifneq ($(DEBUG), "")
CFLAGS += -DNDEBUG
Expand Down Expand Up @@ -28,6 +29,11 @@ TEST_DIR ?= tests
OBJECTS = $(addprefix $(SRC_DIR)/,init.o communication.o process_bookkeeping.o cyclic_buffer.o)
OBJECTS_EXT = $(addprefix $(SRC_DIR)/,network.o forward.o)

# Add headers to object dependencies for conditional recompilation on header change
SOURCES = $(wildcard $(SRC_DIR)/*.c)
DEPS = $(SOURCES:%.c=%.d)
-include $(DEPS)

# Below are the steps performed by this Makefile:
# - download the kernel && kernel headers apk packages
# - verify checksums of the downloaded packages
Expand All @@ -46,18 +52,18 @@ all: vmlinuz-virt initramfs.cpio.gz
$(SRC_DIR)/network.o: $(SRC_DIR)/network.c
$(QUIET_CC)$(CC) $(CFLAGS) \
-I"$(CURDIR)/$(UNPACKED_HEADERS)/usr/include" \
-o $@ -c $^
-o $@ -c $<

$(SRC_DIR)/forward.o: uring $(SRC_DIR)/forward.c
$(QUIET_CC)$(CC) -O2 -Wall -Wextra -Werror -fPIE -pie \
$(SRC_DIR)/forward.o: $(SRC_DIR)/forward.c uring
$(QUIET_CC)$(CC) -MMD -O2 -Wall -Wextra -Werror -fPIE -pie \
-I"$(CURDIR)/$(UNPACKED_HEADERS)/usr/include/" \
-I"$(CURDIR)/$(LIBURING_SUBMODULE)/src/include/" \
-Iinclude/ \
-lstdthreads \
-o $@ -c $(wordlist 2, $(words $^), $^)
-o $@ -c $<

%.o: %.c
$(QUIET_CC)$(CC) $(CFLAGS) -o $@ -c $^
$(QUIET_CC)$(CC) $(CFLAGS) -o $@ -c $<

init: $(UNPACKED_HEADERS) uring $(OBJECTS) $(OBJECTS_EXT)
@echo init
Expand Down
5 changes: 5 additions & 0 deletions runtime/init-container/include/process_bookkeeping.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ struct redir_fd_desc {
struct {
struct cyclic_buffer cb;
int fds[2];
bool closed;
// needed to send notification
uint64_t process_desc_id;
} buffer;
};
};
Expand All @@ -25,6 +28,8 @@ struct process_desc {
uint64_t id;
pid_t pid;
bool is_alive;
int32_t ssi_status;
int32_t ssi_code;
struct redir_fd_desc redirs[3];
struct process_desc* prev;
struct process_desc* next;
Expand Down
42 changes: 14 additions & 28 deletions runtime/init-container/src/cyclic_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <stdbool.h>
#include <sys/mman.h>
#include <unistd.h>
#include <stdio.h>

#include "cyclic_buffer.h"

Expand Down Expand Up @@ -56,16 +57,16 @@ static size_t min(size_t a, size_t b) {

ssize_t cyclic_buffer_read(int fd, struct cyclic_buffer* cb, size_t count) {
ssize_t got = 0;
size_t orig_cb_size = cyclic_buffer_data_size(cb);
size_t free_space = cyclic_buffer_free_size(cb);

while (count) {
while (count && free_space) {
bool fixup_end = false;
if (cb->end == cb->buf + cb->size) {
cb->end = cb->buf;
fixup_end = true;
}

size_t this_read_size = min(cb->buf + cb->size - cb->end, count);
size_t this_read_size = min(free_space, min(cb->buf + cb->size - cb->end, count));
ssize_t ret = read(fd, cb->end, this_read_size);
if (ret <= 0) {
if (fixup_end) {
Expand All @@ -89,37 +90,23 @@ ssize_t cyclic_buffer_read(int fd, struct cyclic_buffer* cb, size_t count) {
cb->end += ret;
count -= ret;
got += ret;
free_space = cyclic_buffer_free_size(cb);

if ((size_t)ret < this_read_size) {
/* Not enough data to fill the whole request. */
break;
}
}

if (got > 0) {
size_t x = orig_cb_size + got;
if (x > cb->size) {
x = cb->size;
}

if (cb->buf + x <= cb->end) {
cb->begin = cb->end - x;
} else {
cb->begin = cb->end + (cb->size - x);
}
}

return got;
}

ssize_t cyclic_buffer_write(int fd, struct cyclic_buffer* cb, size_t count) {
ssize_t wrote = 0;
size_t orig_cb_size = cyclic_buffer_data_size(cb);
size_t available_data = cyclic_buffer_data_size(cb);

count = min(count, orig_cb_size);

while (count) {
size_t this_write_size = min(cb->buf + cb->size - cb->begin, count);
while (count && available_data) {
size_t this_write_size = min(available_data, min(cb->buf + cb->size - cb->begin, count));
ssize_t ret = write(fd, cb->begin, this_write_size);
if (ret < 0) {
if (errno == EINTR) {
Expand All @@ -137,23 +124,22 @@ ssize_t cyclic_buffer_write(int fd, struct cyclic_buffer* cb, size_t count) {
}

cb->begin += ret;
if (cb->begin == cb->buf + cb->size) {
if (cb->begin == cb->end) {
// buffer is empty
cb->begin = cb->buf;
cb->end = cb->buf;
} else if (cb->begin == cb->buf + cb->size) {
cb->begin = cb->buf;
}
count -= ret;
wrote += ret;
available_data = cyclic_buffer_data_size(cb);

if ((size_t)ret < this_write_size) {
/* Short write. */
break;
}
}

if (wrote >= 0 && (size_t)wrote == orig_cb_size) {
/* The buffer is empty. */
cb->begin = cb->buf;
cb->end = cb->buf;
}

return wrote;
}
104 changes: 52 additions & 52 deletions runtime/init-container/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static bool redir_buffers_empty(struct redir_fd_desc *redirs, size_t len) {
break;
case REDIRECT_FD_PIPE_BLOCKING:
case REDIRECT_FD_PIPE_CYCLIC:
if (cyclic_buffer_data_size(&redirs[fd].buffer.cb) != 0) {
if (!redirs[fd].buffer.closed || cyclic_buffer_data_size(&redirs[fd].buffer.cb) != 0) {
return false;
}
break;
Expand All @@ -204,14 +204,6 @@ static bool redir_buffers_empty(struct redir_fd_desc *redirs, size_t len) {
return true;
}

__attribute__((unused)) static void delete_proc(struct process_desc* proc_desc) {
remove_process(proc_desc);
for (size_t fd = 0; fd < 3; ++fd) {
cleanup_fd_desc(&proc_desc->redirs[fd]);
}
free(proc_desc);
}

struct exit_reason {
uint8_t status;
uint8_t type;
Expand Down Expand Up @@ -252,6 +244,22 @@ static struct exit_reason encode_status(int status, int type) {
return exit_reason;
}

static void delete_proc(struct process_desc* proc_desc) {
send_process_died(proc_desc->id, encode_status(proc_desc->ssi_status, proc_desc->ssi_code));

if (proc_desc == g_entrypoint_desc) {
fprintf(stderr, "Entrypoint exited\n");
CHECK(kill(-1, SIGKILL));
die();
}

remove_process(proc_desc);
for (size_t fd = 0; fd < 3; ++fd) {
cleanup_fd_desc(&proc_desc->redirs[fd]);
}
free(proc_desc);
}

static void handle_sigchld(void) {
struct signalfd_siginfo siginfo = { 0 };

Expand Down Expand Up @@ -288,15 +296,8 @@ static void handle_sigchld(void) {
}

proc_desc->is_alive = false;

send_process_died(proc_desc->id, encode_status(siginfo.ssi_status,
siginfo.ssi_code));

if (proc_desc == g_entrypoint_desc) {
fprintf(stderr, "Entrypoint exited\n");
CHECK(kill(-1, SIGKILL));
die();
}
proc_desc->ssi_status = siginfo.ssi_status;
proc_desc->ssi_code = siginfo.ssi_code;

if (redir_buffers_empty(proc_desc->redirs, 3)) {
delete_proc(proc_desc);
Expand Down Expand Up @@ -756,6 +757,22 @@ static uint32_t spawn_new_process(struct new_process_args* new_proc_args,
ret = errno;
goto out_err;
}
proc_desc->redirs[fd].buffer.closed = false;
proc_desc->redirs[fd].buffer.process_desc_id = proc_desc->id;

if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[fd ? 0 : 1],
fd,
&epoll_fd_descs[fd]) < 0) {
if (errno == ENOMEM || errno == ENOSPC) {
ret = errno;
goto out_err;
}
CHECK(-1);
}

CHECK(make_nonblocking(epoll_fd_descs[fd]->fd));

break;
default:
break;
Expand Down Expand Up @@ -801,19 +818,6 @@ static uint32_t spawn_new_process(struct new_process_args* new_proc_args,
|| proc_desc->redirs[fd].type == REDIRECT_FD_PIPE_CYCLIC) {
CHECK(close(proc_desc->redirs[fd].buffer.fds[fd ? 1 : 0]));
proc_desc->redirs[fd].buffer.fds[fd ? 1 : 0] = -1;

if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[fd ? 0 : 1],
fd,
&epoll_fd_descs[fd]) < 0) {
if (errno == ENOMEM || errno == ENOSPC) {
ret = errno;
goto out_err;
}
CHECK(-1);
}

CHECK(make_nonblocking(epoll_fd_descs[fd]->fd));
}
}

Expand Down Expand Up @@ -1236,7 +1240,7 @@ static void handle_query_output(msg_id_t msg_id) {
}
bool was_full = cyclic_buffer_free_size(&proc_desc->redirs[fd].buffer.cb) == 0;
send_response_cyclic_buffer(msg_id, &proc_desc->redirs[fd].buffer.cb, len);
if (was_full && proc_desc->redirs[fd].type != REDIRECT_FD_PIPE_CYCLIC) {
if (was_full && !proc_desc->redirs[fd].buffer.closed) {
if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[0],
fd,
Expand Down Expand Up @@ -1272,47 +1276,42 @@ static void send_output_available_notification(uint64_t id, uint32_t fd) {
CHECK(writen(g_cmds_fd, &fd, sizeof(fd)));
}

static void handle_output_available(struct epoll_fd_desc** epoll_fd_desc_ptr) {
struct epoll_fd_desc* epoll_fd_desc = *epoll_fd_desc_ptr;
static void handle_output_available(struct epoll_fd_desc* epoll_fd_desc) {
struct cyclic_buffer* cb = &epoll_fd_desc->data->buffer.cb;
size_t to_read = cyclic_buffer_free_size(cb);
bool needs_notification = cyclic_buffer_data_size(cb) == 0;

if (epoll_fd_desc->data->type == REDIRECT_FD_PIPE_CYCLIC) {
/* Since fd is marked as non-blocking, it will return EAGAIN once we
* drain all available data. */
to_read = SIZE_MAX;
}

if (to_read == 0) {
/* Buffer is full, deregister `epoll_fd_desc` untill it get's emptied. */
CHECK(del_epoll_fd_desc(epoll_fd_desc));
*epoll_fd_desc_ptr = NULL;
return;
}

ssize_t ret = cyclic_buffer_read(epoll_fd_desc->fd, cb, to_read);
if (ret < 0) {
if (errno == EAGAIN) {
/* This was a spurious wakeup. */
return;
} else {
fprintf(stderr, "Unexpected error while reading in handle_output_available: %m\n");
die();
}
} else if (ret == 0) {
/* EOF. This actually cannot happen, since if we came here, there must
* have been some output available. Maybe just print an error and die()
* here? */
* have been some output available and space in the buffer. Maybe just
* print an error and die() here? */
CHECK(del_epoll_fd_desc(epoll_fd_desc));
*epoll_fd_desc_ptr = NULL;
} else if (needs_notification) {
send_output_available_notification(epoll_fd_desc->data->buffer.process_desc_id, epoll_fd_desc->src_fd);
}
}

static void handle_closing_pipe(struct epoll_fd_desc* epoll_fd_desc) {
epoll_fd_desc->data->buffer.closed = true;
CHECK(del_epoll_fd_desc(epoll_fd_desc));
struct process_desc* proc_desc = find_process_by_id(epoll_fd_desc->data->buffer.process_desc_id);

if (needs_notification) {
/* XXX: this is ugly, but for now there is no other way of obtaining process id here. */
int fd = epoll_fd_desc->src_fd;
struct process_desc* process_desc = CONTAINER_OF(epoll_fd_desc->data, struct process_desc, redirs[fd]);
send_output_available_notification(process_desc->id, fd);
if (!proc_desc->is_alive && redir_buffers_empty(proc_desc->redirs, 3)) {
delete_proc(proc_desc);
}
}

Expand Down Expand Up @@ -1619,9 +1618,10 @@ static noreturn void main_loop(void) {
case EPOLL_FD_IN:
if (event.events & EPOLLIN) {
assert(epoll_fd_desc->data);
handle_output_available(&epoll_fd_desc);
handle_output_available(epoll_fd_desc);
} else if (event.events & EPOLLHUP) {
CHECK(del_epoll_fd_desc(epoll_fd_desc));
assert(epoll_fd_desc->data);
handle_closing_pipe(epoll_fd_desc);
}
break;
default:
Expand Down
Loading

0 comments on commit 725734a

Please sign in to comment.