Skip to content

Commit

Permalink
Wait for all output to be processed before sending process exit notif…
Browse files Browse the repository at this point in the history
…ication.
  • Loading branch information
evik42 committed Oct 28, 2022
1 parent dd8a3e3 commit 6d3378f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 44 deletions.
5 changes: 5 additions & 0 deletions runtime/init-container/include/process_bookkeeping.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ struct redir_fd_desc {
struct {
struct cyclic_buffer cb;
int fds[2];
bool closed;
// needed to send notification
uint64_t process_desc_id;
} buffer;
};
};
Expand All @@ -25,6 +28,8 @@ struct process_desc {
uint64_t id;
pid_t pid;
bool is_alive;
int32_t ssi_status;
int32_t ssi_code;
struct redir_fd_desc redirs[3];
struct process_desc* prev;
struct process_desc* next;
Expand Down
94 changes: 50 additions & 44 deletions runtime/init-container/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static bool redir_buffers_empty(struct redir_fd_desc *redirs, size_t len) {
break;
case REDIRECT_FD_PIPE_BLOCKING:
case REDIRECT_FD_PIPE_CYCLIC:
if (cyclic_buffer_data_size(&redirs[fd].buffer.cb) != 0) {
if (!redirs[fd].buffer.closed || cyclic_buffer_data_size(&redirs[fd].buffer.cb) != 0) {
return false;
}
break;
Expand All @@ -204,14 +204,6 @@ static bool redir_buffers_empty(struct redir_fd_desc *redirs, size_t len) {
return true;
}

__attribute__((unused)) static void delete_proc(struct process_desc* proc_desc) {
remove_process(proc_desc);
for (size_t fd = 0; fd < 3; ++fd) {
cleanup_fd_desc(&proc_desc->redirs[fd]);
}
free(proc_desc);
}

struct exit_reason {
uint8_t status;
uint8_t type;
Expand Down Expand Up @@ -252,6 +244,22 @@ static struct exit_reason encode_status(int status, int type) {
return exit_reason;
}

static void delete_proc(struct process_desc* proc_desc) {
send_process_died(proc_desc->id, encode_status(proc_desc->ssi_status, proc_desc->ssi_code));

if (proc_desc == g_entrypoint_desc) {
fprintf(stderr, "Entrypoint exited\n");
CHECK(kill(-1, SIGKILL));
die();
}

remove_process(proc_desc);
for (size_t fd = 0; fd < 3; ++fd) {
cleanup_fd_desc(&proc_desc->redirs[fd]);
}
free(proc_desc);
}

static void handle_sigchld(void) {
struct signalfd_siginfo siginfo = { 0 };

Expand Down Expand Up @@ -288,15 +296,8 @@ static void handle_sigchld(void) {
}

proc_desc->is_alive = false;

send_process_died(proc_desc->id, encode_status(siginfo.ssi_status,
siginfo.ssi_code));

if (proc_desc == g_entrypoint_desc) {
fprintf(stderr, "Entrypoint exited\n");
CHECK(kill(-1, SIGKILL));
die();
}
proc_desc->ssi_status = siginfo.ssi_status;
proc_desc->ssi_code = siginfo.ssi_code;

if (redir_buffers_empty(proc_desc->redirs, 3)) {
delete_proc(proc_desc);
Expand Down Expand Up @@ -756,6 +757,22 @@ static uint32_t spawn_new_process(struct new_process_args* new_proc_args,
ret = errno;
goto out_err;
}
proc_desc->redirs[fd].buffer.closed = false;
proc_desc->redirs[fd].buffer.process_desc_id = proc_desc->id;

if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[fd ? 0 : 1],
fd,
&epoll_fd_descs[fd]) < 0) {
if (errno == ENOMEM || errno == ENOSPC) {
ret = errno;
goto out_err;
}
CHECK(-1);
}

CHECK(make_nonblocking(epoll_fd_descs[fd]->fd));

break;
default:
break;
Expand Down Expand Up @@ -801,19 +818,6 @@ static uint32_t spawn_new_process(struct new_process_args* new_proc_args,
|| proc_desc->redirs[fd].type == REDIRECT_FD_PIPE_CYCLIC) {
CHECK(close(proc_desc->redirs[fd].buffer.fds[fd ? 1 : 0]));
proc_desc->redirs[fd].buffer.fds[fd ? 1 : 0] = -1;

if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[fd ? 0 : 1],
fd,
&epoll_fd_descs[fd]) < 0) {
if (errno == ENOMEM || errno == ENOSPC) {
ret = errno;
goto out_err;
}
CHECK(-1);
}

CHECK(make_nonblocking(epoll_fd_descs[fd]->fd));
}
}

Expand Down Expand Up @@ -1236,7 +1240,7 @@ static void handle_query_output(msg_id_t msg_id) {
}
bool was_full = cyclic_buffer_free_size(&proc_desc->redirs[fd].buffer.cb) == 0;
send_response_cyclic_buffer(msg_id, &proc_desc->redirs[fd].buffer.cb, len);
if (was_full) {
if (was_full && !proc_desc->redirs[fd].buffer.closed) {
if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[0],
fd,
Expand Down Expand Up @@ -1272,24 +1276,21 @@ static void send_output_available_notification(uint64_t id, uint32_t fd) {
CHECK(writen(g_cmds_fd, &fd, sizeof(fd)));
}

static void handle_output_available(struct epoll_fd_desc** epoll_fd_desc_ptr) {
struct epoll_fd_desc* epoll_fd_desc = *epoll_fd_desc_ptr;
static void handle_output_available(struct epoll_fd_desc* epoll_fd_desc) {
struct cyclic_buffer* cb = &epoll_fd_desc->data->buffer.cb;
size_t to_read = cyclic_buffer_free_size(cb);
bool needs_notification = cyclic_buffer_data_size(cb) == 0;

if (to_read == 0) {
/* Buffer is full, deregister `epoll_fd_desc` untill it get's emptied. */
CHECK(del_epoll_fd_desc(epoll_fd_desc));
*epoll_fd_desc_ptr = NULL;
return;
}

ssize_t ret = cyclic_buffer_read(epoll_fd_desc->fd, cb, to_read);
if (ret < 0) {
if (errno == EAGAIN) {
/* This was a spurious wakeup. */
return;
} else {
fprintf(stderr, "Unexpected error while reading in handle_output_available: %m\n");
die();
Expand All @@ -1299,14 +1300,18 @@ static void handle_output_available(struct epoll_fd_desc** epoll_fd_desc_ptr) {
* have been some output available and space in the buffer. Maybe just
* print an error and die() here? */
CHECK(del_epoll_fd_desc(epoll_fd_desc));
*epoll_fd_desc_ptr = NULL;
} else if (needs_notification) {
send_output_available_notification(epoll_fd_desc->data->buffer.process_desc_id, epoll_fd_desc->src_fd);
}
}

static void handle_closing_pipe(struct epoll_fd_desc* epoll_fd_desc) {
epoll_fd_desc->data->buffer.closed = true;
CHECK(del_epoll_fd_desc(epoll_fd_desc));
struct process_desc* proc_desc = find_process_by_id(epoll_fd_desc->data->buffer.process_desc_id);

if (needs_notification) {
/* XXX: this is ugly, but for now there is no other way of obtaining process id here. */
int fd = epoll_fd_desc->src_fd;
struct process_desc* process_desc = CONTAINER_OF(epoll_fd_desc->data, struct process_desc, redirs[fd]);
send_output_available_notification(process_desc->id, fd);
if (!proc_desc->is_alive && redir_buffers_empty(proc_desc->redirs, 3)) {
delete_proc(proc_desc);
}
}

Expand Down Expand Up @@ -1613,9 +1618,10 @@ static noreturn void main_loop(void) {
case EPOLL_FD_IN:
if (event.events & EPOLLIN) {
assert(epoll_fd_desc->data);
handle_output_available(&epoll_fd_desc);
handle_output_available(epoll_fd_desc);
} else if (event.events & EPOLLHUP) {
CHECK(del_epoll_fd_desc(epoll_fd_desc));
assert(epoll_fd_desc->data);
handle_closing_pipe(epoll_fd_desc);
}
break;
default:
Expand Down

0 comments on commit 6d3378f

Please sign in to comment.