Skip to content

Commit

Permalink
Merge pull request #3 from sergkash7/async
Browse files Browse the repository at this point in the history
Async
  • Loading branch information
xtrime-ru authored Apr 27, 2023
2 parents 9e6d3bd + d5e4a94 commit 56cd1e7
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 149 deletions.
83 changes: 54 additions & 29 deletions app/Commands/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Zoon\PyroSpy\Commands;

use InvalidArgumentException;
use Revolt\EventLoop;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputDefinition;
use Symfony\Component\Console\Input\InputInterface;
Expand Down Expand Up @@ -39,6 +40,20 @@ protected function configure(): void {
'Sample rate in Hz. Used to convert number of samples to CPU time',
100
),
new InputOption(
'concurrentRequestLimit',
'c',
InputOption::VALUE_OPTIONAL,
'Limiting the HTTP client to N concurrent requests, so the HTTP pyroscope server doesn\'t get overwhelmed',
10,
),
new InputOption(
'sendSampleFutureLimit',
'f',
InputOption::VALUE_OPTIONAL,
'Limiting the Send Sample futures buffer to N so as not to get a memory overflow',
10_000,
),
new InputOption(
'interval',
'i',
Expand All @@ -60,13 +75,13 @@ protected function configure(): void {
'Add tags to samples. Example: host=server1, role=cli',
[]
),
new InputOption(
'plugins',
'p',
InputOption::VALUE_IS_ARRAY|InputOption::VALUE_REQUIRED,
'Process trace and phpspy comments/tags with custom class. Can be class or folder with classes',
[]
),
new InputOption(
'plugins',
'p',
InputOption::VALUE_IS_ARRAY|InputOption::VALUE_REQUIRED,
'Process trace and phpspy comments/tags with custom class. Can be class or folder with classes',
[]
),
]))
;
}
Expand All @@ -93,6 +108,14 @@ protected function execute(InputInterface $input, OutputInterface $output): int
if ($rateHz <= 0) {
throw new InvalidArgumentException('rateHz must be positive value');
}
$concurrentRequestLimit = (int)$input->getOption('concurrentRequestLimit');
if ($concurrentRequestLimit <= 0) {
throw new InvalidArgumentException('concurrentRequestLimit must be positive value');
}
$sendSampleFutureLimit = (int)$input->getOption('sendSampleFutureLimit');
if ($sendSampleFutureLimit <= 0) {
throw new InvalidArgumentException('sendSampleFutureLimit must be positive value');
}

$tags = [];
foreach ((array) $input->getOption('tags') as $tag) {
Expand All @@ -103,39 +126,41 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$tags[$name] = $value;
}

$plugins = [];
foreach ((array) $input->getOption('plugins') as $pluginPath) {
if (is_dir($pluginPath)) {
$globPath = str_replace('//', '/', $pluginPath . '/*.php');
foreach (glob($globPath) as $file) {
$plugins[] = self::getClassFromPath($file);
}
} else {
$plugins[] = self::getClassFromPath($pluginPath);
}
}
$plugins = [];
foreach ((array) $input->getOption('plugins') as $pluginPath) {
if (is_dir($pluginPath)) {
$globPath = str_replace('//', '/', $pluginPath . '/*.php');
foreach (glob($globPath) as $file) {
$plugins[] = self::getClassFromPath($file);
}
} else {
$plugins[] = self::getClassFromPath($pluginPath);
}
}

$processor = new Processor(
$interval,
$batch,
new Sender($pyroscope, $app, $rateHz, $tags),
array_values(array_filter($plugins))
new Sender($pyroscope, $app, $rateHz, $tags, $concurrentRequestLimit),
array_values(array_filter($plugins)),
$sendSampleFutureLimit,
);
$processor->process();
EventLoop::run();
return Command::SUCCESS;
}

private static function getClassFromPath(string $path): ?PluginInterface {
if (substr($path, -4, 4 ) !== '.php') {
throw new InvalidArgumentException('Plugin must be php file');
}
require_once $path;
$pathArray = explode('/', $path);
$class = str_replace('.php', '', array_pop($pathArray));
private static function getClassFromPath(string $path): ?PluginInterface {
if (substr($path, -4, 4 ) !== '.php') {
throw new InvalidArgumentException('Plugin must be php file');
}
require_once $path;
$pathArray = explode('/', $path);
$class = str_replace('.php', '', array_pop($pathArray));
if (!$class) {
return null;
}
$class = "Zoon\PyroSpy\Plugins\\$class";
return new $class();
}
}
}
}
20 changes: 10 additions & 10 deletions app/Plugins/ClearEmptyTags.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

class ClearEmptyTags implements PluginInterface {

public function process(array $tags, array $trace): array {
unset($tags['ts']);
public function process(array $tags, array $trace): array {
unset($tags['ts']);

foreach ($tags as $name => $value) {
$value = trim($value);
if (!$value || $value === '-') {
unset($tags[$name]);
}
}
return [$tags, $trace];
}
foreach ($tags as $name => $value) {
$value = trim($value);
if (!$value || $value === '-') {
unset($tags[$name]);
}
}
return [$tags, $trace];
}
}
12 changes: 6 additions & 6 deletions app/Plugins/PluginInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
namespace Zoon\PyroSpy\Plugins;

interface PluginInterface {
/**
* @param array<string, mixed> $tags
* @param array<int|string, array{0:string, 1:string}> $trace
* @return array{0:array<string, mixed>, 1:array<int|string, array{0:string, 1:string}>}
*/
public function process(array $tags, array $trace): array;
/**
* @param array<string, mixed> $tags
* @param array<int|string, array{0:string, 1:string}> $trace
* @return array{0:array<string, mixed>, 1:array<int|string, array{0:string, 1:string}>}
*/
public function process(array $tags, array $trace): array;
}
136 changes: 70 additions & 66 deletions app/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,37 @@

namespace Zoon\PyroSpy;

use Amp\Sync\LocalSemaphore;
use Amp\Sync\Semaphore;
use Generator;
use InvalidArgumentException;
use RuntimeException;
use Throwable;
use Zoon\PyroSpy\Plugins\PluginInterface;
use function Amp\ByteStream\getStdin;
use function Amp\ByteStream\splitLines;

final class Processor {

private int $interval;
private int $batchLimit;

private int $tsStart = 0;
private int $tsEnd;
/**
* @var array<string, array<string, int>>
*/
private array $results;
private readonly Semaphore $sendSampleFutureLimit;

private Sender $sender;
/** @var list<PluginInterface> */
private array $plugins = [];


public function __construct(int $interval, int $batchLimit, Sender $sender, array $plugins = []) {
$this->interval = $interval;
$this->sender = $sender;
/**
* @param list<PluginInterface> $plugins
*/
public function __construct(
private readonly int $interval,
private readonly int $batchLimit,
private readonly Sender $sender,
private readonly array $plugins,
int $sendSampleFutureLimit,
) {
$this->init();
$this->batchLimit = $batchLimit;
$this->plugins = $plugins;
}

public function __destruct() {
$this->sendResults(true);
$this->sendSampleFutureLimit = new LocalSemaphore($sendSampleFutureLimit);
}

private function init(): void {
Expand All @@ -55,7 +53,7 @@ public function process(): void {

if ($isEndOfTrace && count($sample) > 0) {
try {
$tags = self::extractTags($sample);
$tags = self::extractTags($sample);
$samplePrepared = self::prepareSample($sample);
self::checkSample($samplePrepared);
} catch (Throwable $e) {
Expand All @@ -65,16 +63,18 @@ public function process(): void {
continue;
}

foreach ($this->plugins as $plugin) {
[$tags, $samplePrepared] = $plugin->process($tags, $samplePrepared);
}
foreach ($this->plugins as $plugin) {
[$tags, $samplePrepared] = $plugin->process($tags, $samplePrepared);
}
$key = self::stringifyTrace($samplePrepared);
$this->groupTrace($tags, $key);
$this->sendResults();

$sample = [];
}
}

$this->sendResults(true);
}

/**
Expand All @@ -84,9 +84,9 @@ public function process(): void {
private static function prepareSample(array $sample): array {
$samplePrepared = [];
foreach ($sample as $item) {
if (!is_numeric(substr($item, 0, 1))) {
continue;
}
if (!is_numeric(substr($item, 0, 1))) {
continue;
}
$item = explode(' ', $item);
if (count($item) !== 3) {
throw new InvalidArgumentException('Invalid sample shape');
Expand All @@ -97,25 +97,25 @@ private static function prepareSample(array $sample): array {
return $samplePrepared;
}

/**
* @param list<string> $sample
* @return
*/
private static function extractTags(array $sample): array {
$tags = [];
foreach ($sample as $item) {
if (!is_string($item) || substr($item, 0, 1) !== '#') {
continue;
}
$item = explode(' ', $item);
if (count($item) !== 4) {
continue;
}
[$hashtag, $tag, $equalsing, $value] = $item;
$tags[$tag] = $value;
}
return $tags;
}
/**
* @param list<string> $sample
* @return
*/
private static function extractTags(array $sample): array {
$tags = [];
foreach ($sample as $item) {
if (!is_string($item) || substr($item, 0, 1) !== '#') {
continue;
}
$item = explode(' ', $item);
if (count($item) !== 4) {
continue;
}
[$hashtag, $tag, $equalsing, $value] = $item;
$tags[$tag] = $value;
}
return $tags;
}

/**
* @param array<int|string, array{0:string, 1:string}> $samplePrepared
Expand Down Expand Up @@ -150,12 +150,8 @@ private static function stringifyTrace(array $samplePrepared): string {
* @return Generator<string>
*/
private static function getLine(): Generator {
if (STDIN === false) {
throw new RuntimeException('Can\'t open STDIN');
}

while (!feof(STDIN)) {
$line = trim(fgets(STDIN));
foreach (splitLines(getStdin()) as $line) {
$line = trim($line);

//fix trace with eval
if (strpos($line, ' : eval()\'d code:') !== false) {
Expand All @@ -164,38 +160,46 @@ private static function getLine(): Generator {

yield $line;
}

}

private function groupTrace(array $tags, string $key): void {
ksort($tags);
$tagsKey = serialize($tags);
if (!array_key_exists($tagsKey, $this->results)) {
$this->results[$tagsKey] = [];
}
ksort($tags);
$tagsKey = serialize($tags);
if (!array_key_exists($tagsKey, $this->results)) {
$this->results[$tagsKey] = [];
}
if (!array_key_exists($key, $this->results[$tagsKey])) {
$this->results[$tagsKey][$key] = 0;
}
$this->results[$tagsKey][$key]++;
}

private function sendResults(bool $force = false): void {
if (!$force && time() < $this->tsEnd && $this->countResults() < $this->batchLimit) {
$currentTime = time();
if (!$force && $currentTime < $this->tsEnd && $this->countResults() < $this->batchLimit) {
return;
}

foreach ($this->results as $tagSerialized => $results) {
$this->sender->sendSample($this->tsStart, time(), $results, unserialize($tagSerialized));
$tsStart = $this->tsStart;
foreach ($this->results as $tagSerialized => $results) {
$futureLock = $this->sendSampleFutureLimit->acquire();
$this
->sender
->sendSample($tsStart, $currentTime, $results, unserialize($tagSerialized))
->finally(static function () use ($futureLock): void {
$futureLock->release();
})
;
}

$this->init();
}

private function countResults(): int {
$count = 0;
foreach ($this->results as $tagResuts) {
$count += count($tagResuts);
}
return $count;
}
private function countResults(): int {
$count = 0;
foreach ($this->results as $tagResuts) {
$count += count($tagResuts);
}
return $count;
}
}
Loading

0 comments on commit 56cd1e7

Please sign in to comment.