Skip to content

Commit

Permalink
Update thread + worker!
Browse files Browse the repository at this point in the history
  • Loading branch information
VennDev authored Sep 2, 2024
1 parent 3af15b4 commit fea88a0
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 48 deletions.
24 changes: 16 additions & 8 deletions src/vennv/vapm/ClosureThread.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

namespace vennv\vapm;

use Generator;
use function call_user_func;
use function is_array;
use function is_bool;
Expand All @@ -49,31 +50,38 @@ final class ClosureThread extends Thread implements ClosureThreadInterface

private mixed $callback;

public function __construct(callable $callback)
/**
* @var array<int, mixed>
* @phpstan-var array<int, mixed>
*/
private array $argsCallback = [];

public function __construct(callable $callback, array $args = [])

Check failure on line 59 in src/vennv/vapm/ClosureThread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Method vennv\vapm\ClosureThread::__construct() has parameter $args with no value type specified in iterable type array.
{
$this->callback = $callback;
parent::__construct($callback);
$this->argsCallback = $args;
parent::__construct($callback, $args);
}

public function onRun(): void
{
if (is_callable($this->callback)) {
$callback = call_user_func($this->callback);
if ($callback instanceof \Generator) {
$callback = function () use ($callback): \Generator {
yield from $callback;
$callback = call_user_func($this->callback, ...$this->argsCallback);
if ($callback instanceof Generator) {
$callback = function () use ($callback): Generator {
yield from $callback(...$this->argsCallback);

Check failure on line 72 in src/vennv/vapm/ClosureThread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Trying to invoke Generator but it's not a callable.
};
$callback = call_user_func($callback);
}
if (is_array($callback)) {
$callback = json_encode($callback);
} elseif (is_object($callback) && !$callback instanceof \Generator) {
} elseif (is_object($callback) && !$callback instanceof Generator) {
$callback = json_encode($callback);
} elseif (is_bool($callback)) {
$callback = $callback ? 'true' : 'false';
} elseif (is_null($callback)) {
$callback = 'null';
} elseif ($callback instanceof \Generator) {
} elseif ($callback instanceof Generator) {
$callback = json_encode(iterator_to_array($callback));
} else {
$callback = (string)$callback;
Expand Down
1 change: 0 additions & 1 deletion src/vennv/vapm/CoroutineGen.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public static function runNonBlocking(mixed ...$coroutines): void
public static function runBlocking(mixed ...$coroutines): void
{
self::runNonBlocking(...$coroutines);

$gc = new GarbageCollection();
while (!self::$taskQueue?->isEmpty()) {
self::run();
Expand Down
90 changes: 60 additions & 30 deletions src/vennv/vapm/Thread.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
use ReflectionException;
use RuntimeException;
use Throwable;
use function explode;
use function strlen;
use function rtrim;
use function fclose;
use function fwrite;
use function get_called_class;
Expand Down Expand Up @@ -212,14 +213,26 @@ abstract class Thread implements ThreadInterface, ThreadedInterface
*/
private static array $inputs = [];

public function __construct(mixed $input = '')
/**
* @var array<int, mixed>
* @phpstan-var array<int, mixed>
*/
private static array $args = [];

public function __construct(mixed $input = '', array $args = [])

Check failure on line 222 in src/vennv/vapm/Thread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Method vennv\vapm\Thread::__construct() has parameter $args with no value type specified in iterable type array.
{
self::$inputs[get_called_class()] = $input;
self::$inputs[$this->getCalledClassId()] = $input;

Check failure on line 224 in src/vennv/vapm/Thread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Static property vennv\vapm\Thread::$inputs (array<string, mixed>) does not accept array<int|string, mixed>.
self::$args[$this->getCalledClassId()] = $args;
}

private function getCalledClassId(): int
{
return spl_object_id($this);
}

public function getInput(): mixed
{
return self::$inputs[get_called_class()];
return self::$inputs[$this->getCalledClassId()];

Check failure on line 235 in src/vennv/vapm/Thread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Offset int does not exist on array<string, mixed>.
}

public function getPid(): int
Expand Down Expand Up @@ -371,6 +384,8 @@ abstract public function onRun(): void;
public function start(array $mode = DescriptorSpec::BASIC): Promise
{
return new Promise(function ($resolve, $reject) use ($mode): mixed {
$idCall = $this->getCalledClassId();

$className = get_called_class();

$reflection = new ReflectionClass($className);
Expand All @@ -384,44 +399,60 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise
$pathAutoLoad
);

$input = self::$inputs[get_called_class()];
$input = self::$inputs[$idCall];

Check failure on line 402 in src/vennv/vapm/Thread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Offset int does not exist on array<string, mixed>.

if (is_string($input)) $input = '\'' . self::$inputs[get_called_class()] . '\'';
if (is_string($input)) $input = '\'' . self::$inputs[$idCall] . '\'';

Check failure on line 404 in src/vennv/vapm/Thread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Offset int does not exist on array<string, mixed>.

if (is_callable($input) && $input instanceof Closure) {
$input = Utils::closureToString($input);
$input = Utils::removeComments($input);

if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));

$input = Utils::outlineToInline($input);
try {
$input = Utils::closureToStringSafe($input);
} catch (Throwable $e) {
return $reject(new ThreadException($e->getMessage()));
}
}

if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));
if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));

$input = Utils::fixInputCommand($input);
$args = self::$args[$idCall];

if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));
foreach ($args as $key => $arg) {

Check failure on line 418 in src/vennv/vapm/Thread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Argument of an invalid type mixed supplied for foreach, only iterables are supported.
$tryToString = Utils::toStringAny($arg);
$args[$key] = array_values($tryToString)[0];

Check failure on line 420 in src/vennv/vapm/Thread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Cannot access offset mixed on mixed.
FiberManager::wait();
}

if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));
$args = '[' . implode(', ', array_map(function($item) { return '' . $item . ''; }, $args)) . ']';

Check failure on line 424 in src/vennv/vapm/Thread.php

View workflow job for this annotation

GitHub Actions / Jobs (8.2)

Parameter #2 $array of function array_map expects array, mixed given.
$args = str_replace('"', '\'', $args);

$command = PHP_BINARY . ' -r "require_once \'' . $pathAutoLoad . '\'; include \'' . $class . '\'; $input = ' . $input . '; $class = new ' . static::class . '($input); $class->onRun();"';

unset(self::$inputs[get_called_class()]);
$command = PHP_BINARY . ' -r "require_once \'' . $pathAutoLoad . '\'; include \'' . $class . '\'; $input = ' . $input . '; $args = ' . $args . '; $class = new ' . static::class . '($input, $args); $class->onRun();"';

unset(self::$inputs[$idCall]);
unset(self::$args[$idCall]);

$process = proc_open($command, $mode, $pipes);

$timeStart = microtime(true);
while (microtime(true) - $timeStart <= 1) {
FiberManager::wait(); // wait for 1 second
}

$output = '';
$error = '';
if (is_resource($process)) {
$data = json_encode(self::getDataMainThread());

if (is_string($data)) fwrite($pipes[0], $data);
fclose($pipes[0]);

if (is_resource($process)) {
stream_set_blocking($pipes[0], false);
stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);

stream_set_write_buffer($pipes[0], 0);
stream_set_write_buffer($pipes[1], 0);
stream_set_write_buffer($pipes[2], 0);

$data = json_encode(self::getDataMainThread());

if (is_string($data)) @fwrite($pipes[0], $data);
@fclose($pipes[0]);

$status = proc_get_status($process);
$pid = $status['pid'];
if (!isset(self::$threads[$pid])) {
Expand All @@ -446,16 +477,15 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise
$write = null;
$except = null;
$timeout = 0;

$n = stream_select($read, $write, $except, $timeout);
if ($n === false) break;
if ($n > 0) {
foreach ($read as $stream) {
if (!feof($stream)) {
stream_set_blocking($stream, false);
$data = stream_get_contents($stream, 1024);
if ($data === false || $data === '') continue;
$stream === $pipes[1] ? $output .= $data : $error .= $data;
$data = stream_get_contents($stream);
if ($data !== '') {
$stream === $pipes[1] ? $output .= $data : $error .= $data;
}
}
FiberManager::wait();
}
Expand Down Expand Up @@ -483,7 +513,7 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise
} else {
if ($output !== '' && self::isPostMainThread($output)) self::loadSharedData($output);
elseif ($output !== '' && self::isPostThread($output)) {
$output = Utils::getStringAfterSign($output, self::POST_THREAD . '=>');
$output = rtrim(Utils::getStringAfterSign($output, self::POST_THREAD . '=>'));
}
}
} else {
Expand Down
7 changes: 4 additions & 3 deletions src/vennv/vapm/Work.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ public function __construct()
$this->queue = new SplQueue();
}

public function add(callable $work): void
public function add(callable $work, array $args = []): void
{
$this->queue->enqueue($work);
$this->queue->enqueue(new ClosureThread($work, $args));
}

public function remove(int $index): void
Expand Down Expand Up @@ -151,8 +151,9 @@ public function run(): void
{
$gc = new GarbageCollection();
while (!$this->queue->isEmpty()) {
/** @var ClosureThread $work */
$work = $this->queue->dequeue();
if (is_callable($work)) $work();
$work->start();
$gc->collectWL();
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/vennv/vapm/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public function __construct(Work $work, array $options = ["threads" => 4])
$this->work = $work;
$this->options = $options;
$this->id = $this->generateId();

self::$workers[$this->id] = [];
}

Expand Down Expand Up @@ -230,12 +229,9 @@ public function run(callable $callback): Async
while ($this->isLocked() || $totalCountWorks > 0) {
if (!$this->isLocked()) {
if (count($promises) < $threads && $work->count() > 0) {
/** @var ClosureThread $callbackQueue */
$callbackQueue = $work->dequeue();

if (!is_callable($callbackQueue)) continue;

$thread = new ClosureThread($callbackQueue);
$promises[] = $thread->start();
$promises[] = $callbackQueue->start();
} else {
/** @var Promise $promise */
foreach ($promises as $index => $promise) {
Expand Down
99 changes: 99 additions & 0 deletions src/vennv/vapm/utils/Utils.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ public static function milliSecsToSecs(float $milliSecs): float;
*/
public static function closureToString(Closure $closure): string;

/**
* @throws RuntimeException
* @return string
*
* Transform a closure or callable to string
*/
public static function closureToStringSafe(Closure $closure): string;

/**
* Get all Dot files in a directory
*/
Expand Down Expand Up @@ -143,6 +151,22 @@ public static function isClass(string $class): bool;
*/
public static function getStringAfterSign(string $string, string $sign): string;

/**
* @param mixed $data
* @return array<string, string>
*
* Convert data to string
*/
public static function toStringAny(mixed $data): array;

/**
* @param array<string, string> $data
* @return mixed
*
* Convert data to real it's type
*/
public static function fromStringToAny(array $data): mixed;

}

final class Utils implements UtilsInterface
Expand Down Expand Up @@ -181,6 +205,28 @@ public static function closureToString(Closure $closure): string
return substr($result, $startPos, $endBracketPos - $startPos + 1);
}

/**
* @throws RuntimeException
* @return string
*/
public static function closureToStringSafe(Closure $closure): string
{
$input = self::closureToString($closure);
$input = self::removeComments($input);

if (!is_string($input)) throw new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE);

$input = self::outlineToInline($input);

if (!is_string($input)) throw new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE);

$input = self::fixInputCommand($input);

if (!is_string($input)) throw new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE);

return $input;
}

public static function getAllByDotFile(string $path, string $dotFile): Generator
{
$dir = new RecursiveDirectoryIterator($path);
Expand Down Expand Up @@ -336,4 +382,57 @@ public static function getStringAfterSign(string $string, string $sign): string
return '';
}

/**
* @param mixed $data
* @return array<string, string>
*
* Convert data to string
*/
public static function toStringAny(mixed $data): array
{
/**
* @phpstan-ignore-next-line
*/
$type = gettype($data);
if (!is_callable($data) && (is_array($data) || is_object($data))) {
return [$type => json_encode($data)];
} elseif (is_bool($data)) {
$data = $data ? 'true' : 'false';
return [$type => $data];
} elseif (is_resource($data)) {
return [$type => get_resource_type($data)];
} elseif (is_null($data)) {
return [$type => 'null'];
} elseif (is_callable($data)) {
return ['callable' => self::closureToStringSafe($data)];
} elseif (is_string($data)) {
return [$type => '\'' . $data . '\''];
}
return [$type => (string) $data];
}

/**
* @param array<string, string> $data
* @return mixed
*
* Convert data to real it's type
*/
public static function fromStringToAny(array $data): mixed
{
$type = array_keys($data)[0];
$value = array_values($data)[0];
return match ($type) {
'boolean' => $value === 'true',
'integer' => (int) $value,
'float' => (float) $value,
'double' => (float) $value,
'string' => $value,
'array' => json_decode($value, true),
'object' => json_decode($value),
'callable' => eval('return ' . $value . ';'),
'null' => null,
default => $value,
};
}

}

0 comments on commit fea88a0

Please sign in to comment.