Skip to content

Commit

Permalink
Allow zed to run as Windows service.
Browse files Browse the repository at this point in the history
We had to make some changes to upstream zed,
in particular to split the main loop out into
a function, as Windows services as called by
another thread.

Enhance the CreateProcess() to have similar wait()
logic in _reap_children(), and handle multiple
scripts executing.

Signed-off-by: Jorgen Lundman <[email protected]>
  • Loading branch information
lundman committed Nov 22, 2024
1 parent eda4275 commit f655c6e
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 136 deletions.
203 changes: 87 additions & 116 deletions cmd/zed/os/windows/zed_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@

struct launched_process_node {
avl_node_t node;
#ifdef _WIN32
HANDLE processHandle;
DWORD processId;
#else
pid_t pid;
#endif
uint64_t eid;
char *name;
};
Expand All @@ -50,8 +55,8 @@ _launched_process_node_compare(const void *x1, const void *x2)
assert(x1 != NULL);
assert(x2 != NULL);

p1 = ((const struct launched_process_node *) x1)->pid;
p2 = ((const struct launched_process_node *) x2)->pid;
p1 = ((const struct launched_process_node *) x1)->processId;
p2 = ((const struct launched_process_node *) x2)->processId;

if (p1 < p2)
return (-1);
Expand Down Expand Up @@ -189,7 +194,8 @@ env_get_merge_sort(char *unix[])
}

void
launch_process(const char *path, const char *prog, char *env[])
launch_process(const char *path, const char *prog, char *env[],
struct launched_process_node *node)
{
STARTUPINFO si;
PROCESS_INFORMATION pi;
Expand Down Expand Up @@ -227,12 +233,8 @@ launch_process(const char *path, const char *prog, char *env[])
if (sortedMergedEnv)
free(sortedMergedEnv);

// While we debug
WaitForSingleObject(pi.hProcess, INFINITE);

// Close process and thread handles
CloseHandle(pi.hProcess);
CloseHandle(pi.hThread);
node->processHandle = pi.hProcess;
node->processId = pi.dwProcessId;
}

/*
Expand Down Expand Up @@ -272,48 +274,20 @@ _zed_exec_fork_child(uint64_t eid, const char *dir, const char *prog,
return;
}
(void) pthread_mutex_lock(&_launched_processes_lock);
pid = fork();
if (pid < 0) {
(void) pthread_mutex_unlock(&_launched_processes_lock);
zed_log_msg(LOG_WARNING,
"Failed to fork \"%s\" for eid=%llu: %s",
prog, eid, strerror(errno));
return;
} else if (pid == 0) {
(void) sigemptyset(&mask);
(void) sigprocmask(SIG_SETMASK, &mask, NULL);

(void) umask(022);
if (in_foreground && /* we're already devnulled if daemonised */
(fd = open("/dev/null", O_RDWR | O_CLOEXEC)) != -1) {
(void) dup2(fd, STDIN_FILENO);
(void) dup2(fd, STDOUT_FILENO);
(void) dup2(fd, STDERR_FILENO);
}
(void) dup2(zfd, ZEVENT_FILENO);
execle(path, prog, NULL, env);
_exit(127);
}

#ifdef _WIN32
/*
* fork() always returns 1 (parent) above, so here we can spawn process
*/
launch_process(path, prog, env);
#endif

/* parent process */

node = calloc(1, sizeof (*node));

if (node) {
node->pid = pid;
// fills in node
launch_process(path, prog, env, node);

node->eid = eid;
node->name = strdup(prog);
if (node->name == NULL) {
perror("strdup");
exit(EXIT_FAILURE);
exit(EXIT_FAILURE); // eeeewwww
}

avl_add(&_launched_processes, node);
}
(void) pthread_mutex_unlock(&_launched_processes_lock);
Expand All @@ -333,85 +307,72 @@ static void *
_reap_children(void *arg)
{
(void) arg;
#if 0
struct launched_process_node node, *pnode;
pid_t pid;
int status;
struct rusage usage;
struct sigaction sa = {};

(void) sigfillset(&sa.sa_mask);
(void) sigdelset(&sa.sa_mask, SIGCHLD);
(void) pthread_sigmask(SIG_SETMASK, &sa.sa_mask, NULL);

(void) sigemptyset(&sa.sa_mask);
sa.sa_handler = _nop;
sa.sa_flags = SA_NOCLDSTOP;
(void) sigaction(SIGCHLD, &sa, NULL);

for (_reap_children_stop = B_FALSE; !_reap_children_stop; ) {
(void) pthread_mutex_lock(&_launched_processes_lock);
pid = wait4(0, &status, WNOHANG, &usage);

if (pid == 0 || pid == (pid_t)-1) {
(void) pthread_mutex_unlock(&_launched_processes_lock);
if (pid == 0 || errno == ECHILD)
pause();
else if (errno != EINTR)

#define MAX_CHILD_PROCESSES 64
struct launched_process_node *node, *finishedNode;
DWORD waitResult, exitCode;
HANDLE handles[MAX_CHILD_PROCESSES];
struct launched_process_node *nodes[MAX_CHILD_PROCESSES];
size_t handleCount;

while (!_reap_children_stop) {
// Build the list of handles
handleCount = 0;
pthread_mutex_lock(&_launched_processes_lock);
for (node = avl_first(&_launched_processes); node != NULL;
node = AVL_NEXT(&_launched_processes, node)) {
if (handleCount < MAX_CHILD_PROCESSES) {
handles[handleCount] = node->processHandle;
nodes[handleCount] = node;
handleCount++;
} else {
// Log warning if too many child processes
zed_log_msg(LOG_WARNING,
"Failed to wait for children: %s",
strerror(errno));
} else {
memset(&node, 0, sizeof (node));
node.pid = pid;
pnode = avl_find(&_launched_processes, &node, NULL);
if (pnode) {
memcpy(&node, pnode, sizeof (node));

avl_remove(&_launched_processes, pnode);
free(pnode);
"Too many child processes to track");
break;
}
(void) pthread_mutex_unlock(&_launched_processes_lock);
__atomic_add_fetch(&_launched_processes_limit, 1,
__ATOMIC_SEQ_CST);
}
pthread_mutex_unlock(&_launched_processes_lock);

usage.ru_utime.tv_sec += usage.ru_stime.tv_sec;
usage.ru_utime.tv_usec += usage.ru_stime.tv_usec;
usage.ru_utime.tv_sec +=
usage.ru_utime.tv_usec / (1000 * 1000);
usage.ru_utime.tv_usec %= 1000 * 1000;
if (handleCount == 0) {
Sleep(1000); // No child processes to wait for
continue;
}

if (WIFEXITED(status)) {
// Wait for any process to finish
waitResult = WaitForMultipleObjects((DWORD)handleCount,
handles, FALSE, INFINITE);
if (waitResult >= WAIT_OBJECT_0 && waitResult <
WAIT_OBJECT_0 + handleCount) {
size_t index = waitResult - WAIT_OBJECT_0;
finishedNode = nodes[index];

// Get process exit code
if (GetExitCodeProcess(finishedNode->processHandle,
&exitCode)) {
zed_log_msg(LOG_INFO,
"Finished \"%s\" eid=%llu pid=%d "
"time=%llu.%06us exit=%d",
node.name, node.eid, pid,
(unsigned long long) usage.ru_utime.tv_sec,
(unsigned int) usage.ru_utime.tv_usec,
WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
zed_log_msg(LOG_INFO,
"Finished \"%s\" eid=%llu pid=%d "
"time=%llu.%06us sig=%d/%s",
node.name, node.eid, pid,
(unsigned long long) usage.ru_utime.tv_sec,
(unsigned int) usage.ru_utime.tv_usec,
WTERMSIG(status),
strsignal(WTERMSIG(status)));
} else {
zed_log_msg(LOG_INFO,
"Finished \"%s\" eid=%llu pid=%d "
"time=%llu.%06us status=0x%X",
node.name, node.eid, pid,
(unsigned long long) usage.ru_utime.tv_sec,
(unsigned int) usage.ru_utime.tv_usec,
(unsigned int) status);
"Process finished: %s, exitCode=%lu",
finishedNode->name, exitCode);
}

free(node.name);
// Remove node from AVL tree
pthread_mutex_lock(&_launched_processes_lock);
avl_remove(&_launched_processes, finishedNode);
pthread_mutex_unlock(&_launched_processes_lock);
__atomic_add_fetch(&_launched_processes_limit, 1,
__ATOMIC_SEQ_CST);

// Cleanup
CloseHandle(finishedNode->processHandle);
free(finishedNode->name);
free(finishedNode);
} else if (waitResult == WAIT_FAILED) {
zed_log_msg(LOG_ERR,
"WaitForMultipleObjects failed: %lu",
GetLastError());
}
}
#endif

return (NULL);
}

Expand Down Expand Up @@ -476,6 +437,7 @@ zed_exec_process(uint64_t eid, const char *class, const char *subclass,
if (pthread_create(&_reap_children_tid, NULL,
_reap_children, NULL) != 0)
return (-1);

pthread_setname_np(_reap_children_tid, "reap ZEDLETs");

avl_create(&_launched_processes, _launched_process_node_compare,
Expand Down Expand Up @@ -511,7 +473,8 @@ zed_exec_process(uint64_t eid, const char *class, const char *subclass,
return (0);
}

void main_loop(void);
static struct zed_conf *_zcp = NULL;
void main_loop(struct zed_conf *);

static SERVICE_STATUS ServiceStatus;
static SERVICE_STATUS_HANDLE hStatus;
Expand All @@ -535,6 +498,7 @@ ServiceControlHandler(DWORD controlCode)
static void WINAPI
ServiceMain(DWORD argc, LPTSTR *argv)
{

ServiceStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS;
ServiceStatus.dwCurrentState = SERVICE_START_PENDING;
ServiceStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP;
Expand All @@ -548,18 +512,25 @@ ServiceMain(DWORD argc, LPTSTR *argv)

// Service logic here, e.g., initializing zed's main functionality
// while (ServiceStatus.dwCurrentState == SERVICE_RUNNING) {
main_loop();
main_loop(_zcp);
// }
}

void
win_run_loop(void)
win_run_loop(struct zed_conf *zcp)
{
SERVICE_TABLE_ENTRY ServiceTable[] = {
{ TEXT("OpenZFS_zed"), (LPSERVICE_MAIN_FUNCTION) ServiceMain },
{ NULL, NULL }
};

/* Weirdly, Windows has no way to pass a void * */
_zcp = zcp;

/* Blocks until Service is stopped */
StartServiceCtrlDispatcher(ServiceTable);
if (!StartServiceCtrlDispatcher(ServiceTable)) {
fprintf(stderr, "error %d\r\n", GetLastError()); fflush(stderr);
}

exit(EXIT_SUCCESS);
}
37 changes: 19 additions & 18 deletions cmd/zed/zed.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,30 +210,24 @@ _finish_daemonize(void)

/* Notify parent that daemonization is complete. */
zed_log_pipe_close_writes();

#ifdef _WIN32
void win_run_loop(void);
win_run_loop();
#endif
}

/*
* Main loop for ZED.
*/
void
main_loop(void)
main_loop(struct zed_conf *zcp)
{
struct zed_conf zcp;
uint64_t saved_eid;
int64_t saved_etime[2];
zed_log_msg(LOG_NOTICE,
"ZFS Event Daemon %s-%s (PID %d)",
ZFS_META_VERSION, ZFS_META_RELEASE, (int)getpid());

if (zed_conf_open_state(&zcp) < 0)
if (zed_conf_open_state(zcp) < 0)
exit(EXIT_FAILURE);

if (zed_conf_read_state(&zcp, &saved_eid, saved_etime) < 0)
if (zed_conf_read_state(zcp, &saved_eid, saved_etime) < 0)
exit(EXIT_FAILURE);

idle:
Expand All @@ -242,38 +236,39 @@ main_loop(void)
* successful.
*/
do {
if (!zed_event_init(&zcp))
if (!zed_event_init(zcp))
break;
/* Wait for some time and try again. tunable? */
sleep(30);
} while (!_got_exit && zcp.do_idle);
} while (!_got_exit && zcp->do_idle);

if (_got_exit)
goto out;

zed_event_seek(&zcp, saved_eid, saved_etime);
zed_event_seek(zcp, saved_eid, saved_etime);

while (!_got_exit) {
int rv;
if (_got_hup) {
_got_hup = 0;
(void) zed_conf_scan_dir(&zcp);
(void) zed_conf_scan_dir(zcp);

}
rv = zed_event_service(&zcp);
rv = zed_event_service(zcp);

/* ENODEV: When kernel module is unloaded (osx) */
if (rv != 0)
break;
}

zed_log_msg(LOG_NOTICE, "Exiting");
zed_event_fini(&zcp);
zed_event_fini(zcp);

if (zcp.do_idle && !_got_exit)
if (zcp->do_idle && !_got_exit)
goto idle;

out:
zed_conf_destroy(&zcp);
zed_conf_destroy(zcp);
zed_log_fini();
}

Expand Down Expand Up @@ -322,9 +317,15 @@ main(int argc, char *argv[])
if (!zcp.do_foreground)
_finish_daemonize();

#ifdef _WIN32
void win_run_loop(struct zed_conf *);
if (!zcp.do_foreground)
win_run_loop(&zcp);
#endif

/* Windows daemonize needs to specify function for thread */

main_loop();
main_loop(&zcp);

exit(EXIT_SUCCESS);
}
Loading

0 comments on commit f655c6e

Please sign in to comment.