Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve process pool #5604

Merged
merged 5 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ext-src/php_swoole_cxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,12 @@ static inline void array_unset(zval *arg, const char *key, size_t l_key) {
zend_hash_str_del(Z_ARRVAL_P(arg), key, l_key);
}

static inline zend_long object_get_long(zval *obj, zend_string *key) {
static zval rv;
zval *property = zend_read_property_ex(Z_OBJCE_P(obj), Z_OBJ_P(obj), key, 1, &rv);
return property ? zval_get_long(property) : 0;
}

static inline zend_long object_get_long(zval *obj, const char *key, size_t l_key) {
static zval rv;
zval *property = zend_read_property(Z_OBJCE_P(obj), Z_OBJ_P(obj), key, l_key, 1, &rv);
Expand Down
143 changes: 117 additions & 26 deletions ext-src/swoole_process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ static Worker *current_worker = nullptr;
struct ProcessPoolObject {
ProcessPool *pool;
zend::Callable *onStart;
zend::Callable *onShutdown;
zend::Callable *onWorkerStart;
zend::Callable *onWorkerStop;
zend::Callable *onWorkerExit;
zend::Callable *onMessage;
zend_bool enable_coroutine;
zend_bool enable_message_bus;
Expand Down Expand Up @@ -86,6 +88,12 @@ static void process_pool_free_object(zend_object *object) {
if (pp->onStart) {
sw_callable_free(pp->onStart);
}
if (pp->onWorkerExit) {
sw_callable_free(pp->onWorkerExit);
}
if (pp->onShutdown) {
sw_callable_free(pp->onShutdown);
}

zend_object_std_dtor(object);
}
Expand Down Expand Up @@ -141,7 +149,11 @@ void php_swoole_process_pool_minit(int module_number) {
swoole_process_pool, process_pool_create_object, process_pool_free_object, ProcessPoolObject, std);

zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("master_pid"), -1, ZEND_ACC_PUBLIC);
zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("workerPid"), -1, ZEND_ACC_PUBLIC);
zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("workerId"), -1, ZEND_ACC_PUBLIC);
zend_declare_property_null(swoole_process_pool_ce, ZEND_STRL("workers"), ZEND_ACC_PUBLIC);
zend_declare_property_bool(swoole_process_pool_ce, ZEND_STRL("workerRunning"), -1, ZEND_ACC_PUBLIC);
zend_declare_property_bool(swoole_process_pool_ce, ZEND_STRL("running"), -1, ZEND_ACC_PUBLIC);
}

static void process_pool_onWorkerStart(ProcessPool *pool, Worker *worker) {
Expand All @@ -152,19 +164,22 @@ static void process_pool_onWorkerStart(ProcessPool *pool, Worker *worker) {
current_pool = pool;
current_worker = worker;

if (pp->onMessage) {
swoole_signal_set(SIGTERM, process_pool_signal_handler);
}
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), true);
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), true);
zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerPid"), getpid());
zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerId"), worker->id);

if (!pp->onWorkerStart) {
return;
if (pp->onWorkerStart) {
zval args[2];
args[0] = *zobject;
ZVAL_LONG(&args[1], worker->id);
if (UNEXPECTED(!zend::function::call(pp->onWorkerStart->ptr(), 2, args, nullptr, pp->enable_coroutine))) {
php_swoole_error(E_WARNING, "%s->onWorkerStart handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
}
}

zval args[2];
args[0] = *zobject;
ZVAL_LONG(&args[1], worker->id);
if (UNEXPECTED(!zend::function::call(pp->onWorkerStart->ptr(), 2, args, nullptr, pp->enable_coroutine))) {
php_swoole_error(E_WARNING, "%s->onWorkerStart handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
if (!swoole_signal_isset(SIGTERM) && (pp->onMessage || pp->enable_coroutine)) {
swoole_signal_set(SIGTERM, process_pool_signal_handler);
}
}

Expand Down Expand Up @@ -201,6 +216,9 @@ static void process_pool_onWorkerStop(ProcessPool *pool, Worker *worker) {
ProcessPoolObject *pp = process_pool_fetch_object(zobject);
zval args[2];

zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), false);
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), false);

if (pp->onWorkerStop == nullptr) {
return;
}
Expand All @@ -213,6 +231,63 @@ static void process_pool_onWorkerStop(ProcessPool *pool, Worker *worker) {
}
}

static void process_pool_onWorkerExit(ProcessPool *pool, Worker *worker) {
zval *zobject = (zval *) pool->ptr;
ProcessPoolObject *pp = process_pool_fetch_object(zobject);
zval args[2];

zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), false);
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), false);

if (pp->onWorkerExit == nullptr) {
return;
}

args[0] = *zobject;
ZVAL_LONG(&args[1], worker->id);

if (UNEXPECTED(!zend::function::call(pp->onWorkerExit->ptr(), 2, args, nullptr, false))) {
php_swoole_error(E_WARNING, "%s->onWorkerExit handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
}
}

static void process_pool_onStart(ProcessPool *pool) {
zval *zobject = (zval *) pool->ptr;
ProcessPoolObject *pp = process_pool_fetch_object(zobject);
zval args[1];

zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("master_pid"), getpid());
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), true);

if (pp->onStart == nullptr) {
return;
}

args[0] = *zobject;
if (UNEXPECTED(!zend::function::call(pp->onStart->ptr(), 1, args, nullptr, false))) {
php_swoole_error(E_WARNING, "%s->onStart handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
}
}

static void process_pool_onShutdown(ProcessPool *pool) {
zval *zobject = (zval *) pool->ptr;
ProcessPoolObject *pp = process_pool_fetch_object(zobject);
zval args[1];

zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), false);
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), false);

if (pp->onShutdown == nullptr) {
return;
}

args[0] = *zobject;

if (UNEXPECTED(!zend::function::call(pp->onShutdown->ptr(), 1, args, nullptr, false))) {
php_swoole_error(E_WARNING, "%s->onShutdown handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
}
}

static void process_pool_signal_handler(int sig) {
if (!current_pool) {
return;
Expand Down Expand Up @@ -343,7 +418,8 @@ static PHP_METHOD(swoole_process_pool, on) {
pp->onWorkerStart = sw_callable_create(zfn);
} else if (SW_STRCASEEQ(name, l_name, "Message")) {
if (pool->ipc_mode == SW_IPC_NONE) {
php_swoole_fatal_error(E_WARNING, "cannot set onMessage event with ipc_type=0");
zend_throw_exception(
swoole_exception_ce, "cannot set `onMessage` event with ipc_type=0", SW_ERROR_INVALID_PARAMS);
RETURN_FALSE;
}
if (pp->onMessage) {
Expand All @@ -355,11 +431,21 @@ static PHP_METHOD(swoole_process_pool, on) {
sw_callable_free(pp->onWorkerStop);
}
pp->onWorkerStop = sw_callable_create(zfn);
} else if (SW_STRCASEEQ(name, l_name, "WorkerExit")) {
if (pp->onWorkerExit) {
sw_callable_free(pp->onWorkerExit);
}
pp->onWorkerExit = sw_callable_create(zfn);
} else if (SW_STRCASEEQ(name, l_name, "Start")) {
if (pp->onStart) {
sw_callable_free(pp->onStart);
}
pp->onStart = sw_callable_create(zfn);
} else if (SW_STRCASEEQ(name, l_name, "Shutdown")) {
if (pp->onShutdown) {
sw_callable_free(pp->onShutdown);
}
pp->onShutdown = sw_callable_create(zfn);
} else {
php_swoole_error(E_WARNING, "unknown event type[%s]", name);
RETURN_FALSE;
Expand Down Expand Up @@ -489,31 +575,33 @@ static PHP_METHOD(swoole_process_pool, start) {
}
}

if (pp->onWorkerExit && !pp->enable_coroutine) {
zend_throw_exception(
swoole_exception_ce, "cannot set `onWorkerExit` without enable_coroutine", SW_ERROR_INVALID_PARAMS);
RETURN_FALSE;
}

if (pp->onMessage) {
pool->onMessage = process_pool_onMessage;
} else {
pool->main_loop = nullptr;
}

current_pool = pool;

pool->onStart = process_pool_onStart;
pool->onShutdown = process_pool_onShutdown;
pool->onWorkerStart = process_pool_onWorkerStart;
pool->onWorkerStop = process_pool_onWorkerStop;

zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("master_pid"), getpid());
if (pp->enable_coroutine && pp->onWorkerExit) {
pool->onWorkerExit = process_pool_onWorkerExit;
}

if (pool->start() < 0) {
RETURN_FALSE;
}

current_pool = pool;

if (pp->onStart) {
zval args[1];
args[0] = *ZEND_THIS;
if (UNEXPECTED(!zend::function::call(pp->onStart->ptr(), 1, args, nullptr, 0))) {
php_swoole_error(E_WARNING, "%s->onStart handler error", SW_Z_OBJCE_NAME_VAL_P(ZEND_THIS));
}
}

pool->wait();
pool->shutdown();

Expand Down Expand Up @@ -618,10 +706,13 @@ static PHP_METHOD(swoole_process_pool, stop) {
}

static PHP_METHOD(swoole_process_pool, shutdown) {
zval *retval =
sw_zend_read_property_ex(swoole_process_pool_ce, ZEND_THIS, SW_ZSTR_KNOWN(SW_ZEND_STR_MASTER_PID), 0);
long pid = zval_get_long(retval);
RETURN_BOOL(swoole_kill(pid, SIGTERM) == 0);
long pid = zend::object_get_long(ZEND_THIS, SW_ZSTR_KNOWN(SW_ZEND_STR_MASTER_PID));
if (pid > 0) {
RETURN_BOOL(swoole_kill(pid, SIGTERM) == 0);
} else {
zend_throw_exception(swoole_exception_ce, "invalid master pid", SW_ERROR_INVALID_PARAMS);
RETURN_FALSE;
}
}

static PHP_METHOD(swoole_process_pool, __destruct) {}
4 changes: 3 additions & 1 deletion include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ struct ProcessPool {
uint8_t scheduler_warning;
time_t warning_time;

void (*onStart)(ProcessPool *pool);
void (*onShutdown)(ProcessPool *pool);
int (*onTask)(ProcessPool *pool, Worker *worker, EventData *task);
void (*onWorkerStart)(ProcessPool *pool, Worker *worker);
void (*onMessage)(ProcessPool *pool, RecvData *msg);
void (*onWorkerExit)(ProcessPool *pool, Worker *worker);
void (*onWorkerStop)(ProcessPool *pool, Worker *worker);
void (*onWorkerError)(ProcessPool *pool, Worker *worker, const ExitStatus &exit_status);
void (*onWorkerMessage)(ProcessPool *pool, EventData *msg);
Expand All @@ -277,7 +280,6 @@ struct ProcessPool {
Worker *workers;
std::vector<std::shared_ptr<UnixSocket>> *pipes;
std::unordered_map<pid_t, Worker *> *map_;
Reactor *reactor;
MsgQueue *queue;
StreamInfo *stream_info_;
Channel *message_box = nullptr;
Expand Down
1 change: 1 addition & 0 deletions include/swoole_signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void swoole_signalfd_init();
#endif

SW_API swSignalHandler swoole_signal_set(int signo, swSignalHandler func);
SW_API bool swoole_signal_isset(int signo);
SW_API swSignalHandler swoole_signal_set(int signo, swSignalHandler func, int restart, int mask);
SW_API swSignalHandler swoole_signal_get_handler(int signo);

Expand Down
28 changes: 27 additions & 1 deletion src/os/process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,17 @@
if (start_check() < 0) {
return SW_ERR;
}

if (onStart) {
onStart(this);

Check warning on line 270 in src/os/process_pool.cc

View check run for this annotation

Codecov / codecov/patch

src/os/process_pool.cc#L270

Added line #L270 was not covered by tests
}

SW_LOOP_N(worker_num) {
if (spawn(&(workers[i])) < 0) {
return SW_ERR;
}
}

return SW_OK;
}

Expand Down Expand Up @@ -417,9 +423,25 @@
}

void ProcessPool::stop(Worker *worker) {
if (async && worker->pipe_worker) {
worker->shutdown();

Check warning on line 426 in src/os/process_pool.cc

View check run for this annotation

Codecov / codecov/patch

src/os/process_pool.cc#L426

Added line #L426 was not covered by tests

if (!swoole_event_is_available()) {
return;

Check warning on line 429 in src/os/process_pool.cc

View check run for this annotation

Codecov / codecov/patch

src/os/process_pool.cc#L429

Added line #L429 was not covered by tests
}

auto reactor = sw_reactor();

Check warning on line 432 in src/os/process_pool.cc

View check run for this annotation

Codecov / codecov/patch

src/os/process_pool.cc#L432

Added line #L432 was not covered by tests
if (worker->pipe_worker) {
swoole_event_del(worker->pipe_worker);
}

if (onWorkerExit) {
reactor->set_end_callback(Reactor::PRIORITY_TRY_EXIT, [this, worker](Reactor *reactor) {
onWorkerExit(this, worker);

Check warning on line 439 in src/os/process_pool.cc

View check run for this annotation

Codecov / codecov/patch

src/os/process_pool.cc#L438-L439

Added lines #L438 - L439 were not covered by tests
if (reactor->if_exit()) {
reactor->running = false;

Check warning on line 441 in src/os/process_pool.cc

View check run for this annotation

Codecov / codecov/patch

src/os/process_pool.cc#L441

Added line #L441 was not covered by tests
}
});

Check warning on line 443 in src/os/process_pool.cc

View check run for this annotation

Codecov / codecov/patch

src/os/process_pool.cc#L443

Added line #L443 was not covered by tests
}
}

void ProcessPool::shutdown() {
Expand All @@ -428,6 +450,10 @@
Worker *worker;
running = 0;

if (onShutdown) {
onShutdown(this);

Check warning on line 454 in src/os/process_pool.cc

View check run for this annotation

Codecov / codecov/patch

src/os/process_pool.cc#L454

Added line #L454 was not covered by tests
}

// concurrent kill
for (i = 0; i < worker_num; i++) {
worker = &workers[i];
Expand Down
4 changes: 4 additions & 0 deletions src/os/signal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@
return oact.sa_handler;
}

SW_API bool swoole_signal_isset(int signo) {
return signals[signo].handler && signals[signo].activated;

Check warning on line 126 in src/os/signal.cc

View check run for this annotation

Codecov / codecov/patch

src/os/signal.cc#L125-L126

Added lines #L125 - L126 were not covered by tests
}

/**
* set new signal handler and return origin signal handler
*/
Expand Down
2 changes: 1 addition & 1 deletion tests/include/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
define('REDIS_SERVER_PWD', getenv('REDIS_SERVER_PWD') ?: 'root');
define('REDIS_SERVER_DB', (int)(getenv('REDIS_SERVER_DB') ?: 0));

if (!getenv('SWOOLE_TEST_NO_DOCKER')) {
if (getenv('SWOOLE_TEST_IN_DOCKER')) {
if (!empty($info = `docker ps 2>&1 | grep httpbin 2>&1`) &&
preg_match('/\s+?[^:]+:(\d+)->\d+\/tcp\s+/', $info, $matches) &&
is_numeric($matches[1])) {
Expand Down
Loading
Loading