diff --git a/src/vennv/vapm/ClosureThread.php b/src/vennv/vapm/ClosureThread.php index ca5ec3b2e..08881cb03 100644 --- a/src/vennv/vapm/ClosureThread.php +++ b/src/vennv/vapm/ClosureThread.php @@ -23,6 +23,7 @@ namespace vennv\vapm; +use Generator; use function call_user_func; use function is_array; use function is_bool; @@ -49,31 +50,38 @@ final class ClosureThread extends Thread implements ClosureThreadInterface private mixed $callback; - public function __construct(callable $callback) + /** + * @var array + * @phpstan-var array + */ + private array $argsCallback = []; + + public function __construct(callable $callback, array $args = []) { $this->callback = $callback; - parent::__construct($callback); + $this->argsCallback = $args; + parent::__construct($callback, $args); } public function onRun(): void { if (is_callable($this->callback)) { - $callback = call_user_func($this->callback); - if ($callback instanceof \Generator) { - $callback = function () use ($callback): \Generator { - yield from $callback; + $callback = call_user_func($this->callback, ...$this->argsCallback); + if ($callback instanceof Generator) { + $callback = function () use ($callback): Generator { + yield from $callback(...$this->argsCallback); }; $callback = call_user_func($callback); } if (is_array($callback)) { $callback = json_encode($callback); - } elseif (is_object($callback) && !$callback instanceof \Generator) { + } elseif (is_object($callback) && !$callback instanceof Generator) { $callback = json_encode($callback); } elseif (is_bool($callback)) { $callback = $callback ? 'true' : 'false'; } elseif (is_null($callback)) { $callback = 'null'; - } elseif ($callback instanceof \Generator) { + } elseif ($callback instanceof Generator) { $callback = json_encode(iterator_to_array($callback)); } else { $callback = (string)$callback; diff --git a/src/vennv/vapm/CoroutineGen.php b/src/vennv/vapm/CoroutineGen.php index f67fc01b3..50dd4cd06 100644 --- a/src/vennv/vapm/CoroutineGen.php +++ b/src/vennv/vapm/CoroutineGen.php @@ -117,7 +117,6 @@ public static function runNonBlocking(mixed ...$coroutines): void public static function runBlocking(mixed ...$coroutines): void { self::runNonBlocking(...$coroutines); - $gc = new GarbageCollection(); while (!self::$taskQueue?->isEmpty()) { self::run(); diff --git a/src/vennv/vapm/Thread.php b/src/vennv/vapm/Thread.php index ed6d6208c..4094442ae 100644 --- a/src/vennv/vapm/Thread.php +++ b/src/vennv/vapm/Thread.php @@ -29,7 +29,8 @@ use ReflectionException; use RuntimeException; use Throwable; -use function explode; +use function strlen; +use function rtrim; use function fclose; use function fwrite; use function get_called_class; @@ -212,14 +213,26 @@ abstract class Thread implements ThreadInterface, ThreadedInterface */ private static array $inputs = []; - public function __construct(mixed $input = '') + /** + * @var array + * @phpstan-var array + */ + private static array $args = []; + + public function __construct(mixed $input = '', array $args = []) { - self::$inputs[get_called_class()] = $input; + self::$inputs[$this->getCalledClassId()] = $input; + self::$args[$this->getCalledClassId()] = $args; + } + + private function getCalledClassId(): int + { + return spl_object_id($this); } public function getInput(): mixed { - return self::$inputs[get_called_class()]; + return self::$inputs[$this->getCalledClassId()]; } public function getPid(): int @@ -371,6 +384,8 @@ abstract public function onRun(): void; public function start(array $mode = DescriptorSpec::BASIC): Promise { return new Promise(function ($resolve, $reject) use ($mode): mixed { + $idCall = $this->getCalledClassId(); + $className = get_called_class(); $reflection = new ReflectionClass($className); @@ -384,44 +399,60 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise $pathAutoLoad ); - $input = self::$inputs[get_called_class()]; + $input = self::$inputs[$idCall]; - if (is_string($input)) $input = '\'' . self::$inputs[get_called_class()] . '\''; + if (is_string($input)) $input = '\'' . self::$inputs[$idCall] . '\''; if (is_callable($input) && $input instanceof Closure) { - $input = Utils::closureToString($input); - $input = Utils::removeComments($input); - - if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE)); - - $input = Utils::outlineToInline($input); + try { + $input = Utils::closureToStringSafe($input); + } catch (Throwable $e) { + return $reject(new ThreadException($e->getMessage())); + } + } - if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE)); + if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE)); - $input = Utils::fixInputCommand($input); + $args = self::$args[$idCall]; - if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE)); + foreach ($args as $key => $arg) { + $tryToString = Utils::toStringAny($arg); + $args[$key] = array_values($tryToString)[0]; + FiberManager::wait(); } - if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE)); + $args = '[' . implode(', ', array_map(function($item) { return '' . $item . ''; }, $args)) . ']'; + $args = str_replace('"', '\'', $args); - $command = PHP_BINARY . ' -r "require_once \'' . $pathAutoLoad . '\'; include \'' . $class . '\'; $input = ' . $input . '; $class = new ' . static::class . '($input); $class->onRun();"'; - - unset(self::$inputs[get_called_class()]); + $command = PHP_BINARY . ' -r "require_once \'' . $pathAutoLoad . '\'; include \'' . $class . '\'; $input = ' . $input . '; $args = ' . $args . '; $class = new ' . static::class . '($input, $args); $class->onRun();"'; + + unset(self::$inputs[$idCall]); + unset(self::$args[$idCall]); $process = proc_open($command, $mode, $pipes); + $timeStart = microtime(true); + while (microtime(true) - $timeStart <= 1) { + FiberManager::wait(); // wait for 1 second + } + $output = ''; $error = ''; - if (is_resource($process)) { - $data = json_encode(self::getDataMainThread()); - - if (is_string($data)) fwrite($pipes[0], $data); - fclose($pipes[0]); + if (is_resource($process)) { + stream_set_blocking($pipes[0], false); stream_set_blocking($pipes[1], false); stream_set_blocking($pipes[2], false); + stream_set_write_buffer($pipes[0], 0); + stream_set_write_buffer($pipes[1], 0); + stream_set_write_buffer($pipes[2], 0); + + $data = json_encode(self::getDataMainThread()); + + if (is_string($data)) @fwrite($pipes[0], $data); + @fclose($pipes[0]); + $status = proc_get_status($process); $pid = $status['pid']; if (!isset(self::$threads[$pid])) { @@ -446,16 +477,15 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise $write = null; $except = null; $timeout = 0; - $n = stream_select($read, $write, $except, $timeout); if ($n === false) break; if ($n > 0) { foreach ($read as $stream) { if (!feof($stream)) { - stream_set_blocking($stream, false); - $data = stream_get_contents($stream, 1024); - if ($data === false || $data === '') continue; - $stream === $pipes[1] ? $output .= $data : $error .= $data; + $data = stream_get_contents($stream); + if ($data !== '') { + $stream === $pipes[1] ? $output .= $data : $error .= $data; + } } FiberManager::wait(); } @@ -483,7 +513,7 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise } else { if ($output !== '' && self::isPostMainThread($output)) self::loadSharedData($output); elseif ($output !== '' && self::isPostThread($output)) { - $output = Utils::getStringAfterSign($output, self::POST_THREAD . '=>'); + $output = rtrim(Utils::getStringAfterSign($output, self::POST_THREAD . '=>')); } } } else { diff --git a/src/vennv/vapm/Work.php b/src/vennv/vapm/Work.php index e00903f3f..8837b07fe 100644 --- a/src/vennv/vapm/Work.php +++ b/src/vennv/vapm/Work.php @@ -107,9 +107,9 @@ public function __construct() $this->queue = new SplQueue(); } - public function add(callable $work): void + public function add(callable $work, array $args = []): void { - $this->queue->enqueue($work); + $this->queue->enqueue(new ClosureThread($work, $args)); } public function remove(int $index): void @@ -151,8 +151,9 @@ public function run(): void { $gc = new GarbageCollection(); while (!$this->queue->isEmpty()) { + /** @var ClosureThread $work */ $work = $this->queue->dequeue(); - if (is_callable($work)) $work(); + $work->start(); $gc->collectWL(); } } diff --git a/src/vennv/vapm/Worker.php b/src/vennv/vapm/Worker.php index 362eb5904..2b2d340db 100644 --- a/src/vennv/vapm/Worker.php +++ b/src/vennv/vapm/Worker.php @@ -151,7 +151,6 @@ public function __construct(Work $work, array $options = ["threads" => 4]) $this->work = $work; $this->options = $options; $this->id = $this->generateId(); - self::$workers[$this->id] = []; } @@ -230,12 +229,9 @@ public function run(callable $callback): Async while ($this->isLocked() || $totalCountWorks > 0) { if (!$this->isLocked()) { if (count($promises) < $threads && $work->count() > 0) { + /** @var ClosureThread $callbackQueue */ $callbackQueue = $work->dequeue(); - - if (!is_callable($callbackQueue)) continue; - - $thread = new ClosureThread($callbackQueue); - $promises[] = $thread->start(); + $promises[] = $callbackQueue->start(); } else { /** @var Promise $promise */ foreach ($promises as $index => $promise) { diff --git a/src/vennv/vapm/utils/Utils.php b/src/vennv/vapm/utils/Utils.php index b8f17b7db..f79cea62e 100644 --- a/src/vennv/vapm/utils/Utils.php +++ b/src/vennv/vapm/utils/Utils.php @@ -59,6 +59,14 @@ public static function milliSecsToSecs(float $milliSecs): float; */ public static function closureToString(Closure $closure): string; + /** + * @throws RuntimeException + * @return string + * + * Transform a closure or callable to string + */ + public static function closureToStringSafe(Closure $closure): string; + /** * Get all Dot files in a directory */ @@ -143,6 +151,22 @@ public static function isClass(string $class): bool; */ public static function getStringAfterSign(string $string, string $sign): string; + /** + * @param mixed $data + * @return array + * + * Convert data to string + */ + public static function toStringAny(mixed $data): array; + + /** + * @param array $data + * @return mixed + * + * Convert data to real it's type + */ + public static function fromStringToAny(array $data): mixed; + } final class Utils implements UtilsInterface @@ -181,6 +205,28 @@ public static function closureToString(Closure $closure): string return substr($result, $startPos, $endBracketPos - $startPos + 1); } + /** + * @throws RuntimeException + * @return string + */ + public static function closureToStringSafe(Closure $closure): string + { + $input = self::closureToString($closure); + $input = self::removeComments($input); + + if (!is_string($input)) throw new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE); + + $input = self::outlineToInline($input); + + if (!is_string($input)) throw new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE); + + $input = self::fixInputCommand($input); + + if (!is_string($input)) throw new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE); + + return $input; + } + public static function getAllByDotFile(string $path, string $dotFile): Generator { $dir = new RecursiveDirectoryIterator($path); @@ -336,4 +382,57 @@ public static function getStringAfterSign(string $string, string $sign): string return ''; } + /** + * @param mixed $data + * @return array + * + * Convert data to string + */ + public static function toStringAny(mixed $data): array + { + /** + * @phpstan-ignore-next-line + */ + $type = gettype($data); + if (!is_callable($data) && (is_array($data) || is_object($data))) { + return [$type => json_encode($data)]; + } elseif (is_bool($data)) { + $data = $data ? 'true' : 'false'; + return [$type => $data]; + } elseif (is_resource($data)) { + return [$type => get_resource_type($data)]; + } elseif (is_null($data)) { + return [$type => 'null']; + } elseif (is_callable($data)) { + return ['callable' => self::closureToStringSafe($data)]; + } elseif (is_string($data)) { + return [$type => '\'' . $data . '\'']; + } + return [$type => (string) $data]; + } + + /** + * @param array $data + * @return mixed + * + * Convert data to real it's type + */ + public static function fromStringToAny(array $data): mixed + { + $type = array_keys($data)[0]; + $value = array_values($data)[0]; + return match ($type) { + 'boolean' => $value === 'true', + 'integer' => (int) $value, + 'float' => (float) $value, + 'double' => (float) $value, + 'string' => $value, + 'array' => json_decode($value, true), + 'object' => json_decode($value), + 'callable' => eval('return ' . $value . ';'), + 'null' => null, + default => $value, + }; + } + } \ No newline at end of file