Skip to content

Commit

Permalink
Merge pull request #150 from golemfactory/evik/revert-waiting-for-all…
Browse files Browse the repository at this point in the history
…-output

Revert "Wait for all output to be processed before sending process exit notification."
  • Loading branch information
evik42 authored Nov 8, 2022
2 parents 493f387 + 1967045 commit 8f4c5d3
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 55 deletions.
5 changes: 0 additions & 5 deletions runtime/init-container/include/process_bookkeeping.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ struct redir_fd_desc {
struct {
struct cyclic_buffer cb;
int fds[2];
bool closed;
// needed to send notification
uint64_t process_desc_id;
} buffer;
};
};
Expand All @@ -28,8 +25,6 @@ struct process_desc {
uint64_t id;
pid_t pid;
bool is_alive;
int32_t ssi_status;
int32_t ssi_code;
struct redir_fd_desc redirs[3];
struct process_desc* prev;
struct process_desc* next;
Expand Down
94 changes: 44 additions & 50 deletions runtime/init-container/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static bool redir_buffers_empty(struct redir_fd_desc *redirs, size_t len) {
break;
case REDIRECT_FD_PIPE_BLOCKING:
case REDIRECT_FD_PIPE_CYCLIC:
if (!redirs[fd].buffer.closed || cyclic_buffer_data_size(&redirs[fd].buffer.cb) != 0) {
if (cyclic_buffer_data_size(&redirs[fd].buffer.cb) != 0) {
return false;
}
break;
Expand All @@ -204,6 +204,14 @@ static bool redir_buffers_empty(struct redir_fd_desc *redirs, size_t len) {
return true;
}

__attribute__((unused)) static void delete_proc(struct process_desc* proc_desc) {
remove_process(proc_desc);
for (size_t fd = 0; fd < 3; ++fd) {
cleanup_fd_desc(&proc_desc->redirs[fd]);
}
free(proc_desc);
}

struct exit_reason {
uint8_t status;
uint8_t type;
Expand Down Expand Up @@ -244,22 +252,6 @@ static struct exit_reason encode_status(int status, int type) {
return exit_reason;
}

static void delete_proc(struct process_desc* proc_desc) {
send_process_died(proc_desc->id, encode_status(proc_desc->ssi_status, proc_desc->ssi_code));

if (proc_desc == g_entrypoint_desc) {
fprintf(stderr, "Entrypoint exited\n");
CHECK(kill(-1, SIGKILL));
die();
}

remove_process(proc_desc);
for (size_t fd = 0; fd < 3; ++fd) {
cleanup_fd_desc(&proc_desc->redirs[fd]);
}
free(proc_desc);
}

static void handle_sigchld(void) {
struct signalfd_siginfo siginfo = { 0 };

Expand Down Expand Up @@ -296,8 +288,15 @@ static void handle_sigchld(void) {
}

proc_desc->is_alive = false;
proc_desc->ssi_status = siginfo.ssi_status;
proc_desc->ssi_code = siginfo.ssi_code;

send_process_died(proc_desc->id, encode_status(siginfo.ssi_status,
siginfo.ssi_code));

if (proc_desc == g_entrypoint_desc) {
fprintf(stderr, "Entrypoint exited\n");
CHECK(kill(-1, SIGKILL));
die();
}

if (redir_buffers_empty(proc_desc->redirs, 3)) {
delete_proc(proc_desc);
Expand Down Expand Up @@ -757,22 +756,6 @@ static uint32_t spawn_new_process(struct new_process_args* new_proc_args,
ret = errno;
goto out_err;
}
proc_desc->redirs[fd].buffer.closed = false;
proc_desc->redirs[fd].buffer.process_desc_id = proc_desc->id;

if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[fd ? 0 : 1],
fd,
&epoll_fd_descs[fd]) < 0) {
if (errno == ENOMEM || errno == ENOSPC) {
ret = errno;
goto out_err;
}
CHECK(-1);
}

CHECK(make_nonblocking(epoll_fd_descs[fd]->fd));

break;
default:
break;
Expand Down Expand Up @@ -818,6 +801,19 @@ static uint32_t spawn_new_process(struct new_process_args* new_proc_args,
|| proc_desc->redirs[fd].type == REDIRECT_FD_PIPE_CYCLIC) {
CHECK(close(proc_desc->redirs[fd].buffer.fds[fd ? 1 : 0]));
proc_desc->redirs[fd].buffer.fds[fd ? 1 : 0] = -1;

if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[fd ? 0 : 1],
fd,
&epoll_fd_descs[fd]) < 0) {
if (errno == ENOMEM || errno == ENOSPC) {
ret = errno;
goto out_err;
}
CHECK(-1);
}

CHECK(make_nonblocking(epoll_fd_descs[fd]->fd));
}
}

Expand Down Expand Up @@ -1240,7 +1236,7 @@ static void handle_query_output(msg_id_t msg_id) {
}
bool was_full = cyclic_buffer_free_size(&proc_desc->redirs[fd].buffer.cb) == 0;
send_response_cyclic_buffer(msg_id, &proc_desc->redirs[fd].buffer.cb, len);
if (was_full && !proc_desc->redirs[fd].buffer.closed) {
if (was_full) {
if (add_epoll_fd_desc(&proc_desc->redirs[fd],
proc_desc->redirs[fd].buffer.fds[0],
fd,
Expand Down Expand Up @@ -1276,21 +1272,24 @@ static void send_output_available_notification(uint64_t id, uint32_t fd) {
CHECK(writen(g_cmds_fd, &fd, sizeof(fd)));
}

static void handle_output_available(struct epoll_fd_desc* epoll_fd_desc) {
static void handle_output_available(struct epoll_fd_desc** epoll_fd_desc_ptr) {
struct epoll_fd_desc* epoll_fd_desc = *epoll_fd_desc_ptr;
struct cyclic_buffer* cb = &epoll_fd_desc->data->buffer.cb;
size_t to_read = cyclic_buffer_free_size(cb);
bool needs_notification = cyclic_buffer_data_size(cb) == 0;

if (to_read == 0) {
/* Buffer is full, deregister `epoll_fd_desc` untill it get's emptied. */
CHECK(del_epoll_fd_desc(epoll_fd_desc));
*epoll_fd_desc_ptr = NULL;
return;
}

ssize_t ret = cyclic_buffer_read(epoll_fd_desc->fd, cb, to_read);
if (ret < 0) {
if (errno == EAGAIN) {
/* This was a spurious wakeup. */
return;
} else {
fprintf(stderr, "Unexpected error while reading in handle_output_available: %m\n");
die();
Expand All @@ -1300,18 +1299,14 @@ static void handle_output_available(struct epoll_fd_desc* epoll_fd_desc) {
* have been some output available and space in the buffer. Maybe just
* print an error and die() here? */
CHECK(del_epoll_fd_desc(epoll_fd_desc));
} else if (needs_notification) {
send_output_available_notification(epoll_fd_desc->data->buffer.process_desc_id, epoll_fd_desc->src_fd);
*epoll_fd_desc_ptr = NULL;
}
}

static void handle_closing_pipe(struct epoll_fd_desc* epoll_fd_desc) {
epoll_fd_desc->data->buffer.closed = true;
CHECK(del_epoll_fd_desc(epoll_fd_desc));
struct process_desc* proc_desc = find_process_by_id(epoll_fd_desc->data->buffer.process_desc_id);

if (!proc_desc->is_alive && redir_buffers_empty(proc_desc->redirs, 3)) {
delete_proc(proc_desc);
if (needs_notification) {
/* XXX: this is ugly, but for now there is no other way of obtaining process id here. */
int fd = epoll_fd_desc->src_fd;
struct process_desc* process_desc = CONTAINER_OF(epoll_fd_desc->data, struct process_desc, redirs[fd]);
send_output_available_notification(process_desc->id, fd);
}
}

Expand Down Expand Up @@ -1618,10 +1613,9 @@ static noreturn void main_loop(void) {
case EPOLL_FD_IN:
if (event.events & EPOLLIN) {
assert(epoll_fd_desc->data);
handle_output_available(epoll_fd_desc);
handle_output_available(&epoll_fd_desc);
} else if (event.events & EPOLLHUP) {
assert(epoll_fd_desc->data);
handle_closing_pipe(epoll_fd_desc);
CHECK(del_epoll_fd_desc(epoll_fd_desc));
}
break;
default:
Expand Down

0 comments on commit 8f4c5d3

Please sign in to comment.