From df513f3fd274811ebb8358d05d7cec19ee8bd3e1 Mon Sep 17 00:00:00 2001 From: walkor Date: Thu, 7 Nov 2024 16:31:33 +0800 Subject: [PATCH] Fix swoole --- Events/Swoole.php | 61 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 4 deletions(-) diff --git a/Events/Swoole.php b/Events/Swoole.php index fcd747238..8f0264c34 100644 --- a/Events/Swoole.php +++ b/Events/Swoole.php @@ -16,6 +16,7 @@ use Workerman\Worker; use Swoole\Event; use Swoole\Timer; +use Swoole\Coroutine; class Swoole implements EventInterface { @@ -33,6 +34,10 @@ class Swoole implements EventInterface protected $_hasSignal = false; + protected $_readEvents = array(); + + protected $_writeEvents = array(); + /** * * {@inheritdoc} @@ -61,7 +66,7 @@ function () { $mapId = $this->mapId++; $t = (int)($fd * 1000); if ($t < 1) { - $t = 1; + $t = 1; } $timer_id = Timer::$method($t, function ($timer_id = null) use ($func, $args, $mapId) { @@ -92,9 +97,14 @@ function ($timer_id = null) use ($func, $args, $mapId) { case self::EV_READ: case self::EV_WRITE: $fd_key = (int) $fd; - if (! isset($this->_fd[$fd_key])) { + if ($flag === self::EV_READ) { + $this->_readEvents[$fd_key] = $func; + } else { + $this->_writeEvents[$fd_key] = $func; + } + if (!isset($this->_fd[$fd_key])) { if ($flag === self::EV_READ) { - $res = Event::add($fd, $func, null, SWOOLE_EVENT_READ); + $res = Event::add($fd, [$this, 'callRead'], null, SWOOLE_EVENT_READ); $fd_type = SWOOLE_EVENT_READ; } else { $res = Event::add($fd, null, $func, SWOOLE_EVENT_WRITE); @@ -124,6 +134,42 @@ function ($timer_id = null) use ($func, $args, $mapId) { } } + /** + * @param $fd + * @return void + */ + protected function callRead($stream) + { + $fd = (int) $stream; + if (isset($this->_readEvents[$fd])) { + try { + \call_user_func($this->_readEvents[$fd], $stream); + } catch (\Exception $e) { + Worker::stopAll(250, $e); + } catch (\Error $e) { + Worker::stopAll(250, $e); + } + } + } + + /** + * @param $fd + * @return void + */ + protected function callWrite($stream) + { + $fd = (int) $stream; + if (isset($this->_writeEvents[$fd])) { + try { + \call_user_func($this->_writeEvents[$fd], $stream); + } catch (\Exception $e) { + Worker::stopAll(250, $e); + } catch (\Error $e) { + Worker::stopAll(250, $e); + } + } + } + /** * * {@inheritdoc} @@ -153,6 +199,11 @@ public function del($fd, $flag) case self::EV_READ: case self::EV_WRITE: $fd_key = (int) $fd; + if ($flag === self::EV_READ) { + unset($this->_readEvents[$fd_key]); + } elseif ($flag === self::EV_WRITE) { + unset($this->_writeEvents[$fd_key]); + } if (isset($this->_fd[$fd_key])) { $fd_val = $this->_fd[$fd_key]; if ($flag === self::EV_READ) { @@ -213,8 +264,10 @@ public function loop() */ public function destroy() { + foreach (Coroutine::listCoroutines() as $coroutine) { + Coroutine::cancel($coroutine); + } Event::exit(); - posix_kill(posix_getpid(), SIGINT); } /**