Skip to content

Commit

Permalink
Merge pull request #4 from sergkash7/amphp_pipeline
Browse files Browse the repository at this point in the history
Added use of amphp/pipeline instead of amphp/sync
  • Loading branch information
xtrime-ru authored Jun 27, 2023
2 parents 56cd1e7 + 8bde2dc commit 0ebaadd
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 91 deletions.
5 changes: 2 additions & 3 deletions app/Commands/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Zoon\PyroSpy\Commands;

use InvalidArgumentException;
use Revolt\EventLoop;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputDefinition;
use Symfony\Component\Console\Input\InputInterface;
Expand Down Expand Up @@ -141,12 +140,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$processor = new Processor(
$interval,
$batch,
new Sender($pyroscope, $app, $rateHz, $tags, $concurrentRequestLimit),
new Sender($pyroscope, $app, $rateHz, $tags),
array_values(array_filter($plugins)),
$sendSampleFutureLimit,
$concurrentRequestLimit,
);
$processor->process();
EventLoop::run();
return Command::SUCCESS;
}

Expand Down
119 changes: 70 additions & 49 deletions app/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

namespace Zoon\PyroSpy;

use Amp\Sync\LocalSemaphore;
use Amp\Sync\Semaphore;
use Amp\Future;
use Amp\Pipeline\Queue;
use Generator;
use InvalidArgumentException;
use Throwable;
use Zoon\PyroSpy\Plugins\PluginInterface;
use function Amp\async;
use function Amp\ByteStream\getStdin;
use function Amp\ByteStream\splitLines;

Expand All @@ -19,7 +20,8 @@ final class Processor {
* @var array<string, array<string, int>>
*/
private array $results;
private readonly Semaphore $sendSampleFutureLimit;
/** @var Queue<Sample> */
private readonly Queue $queue;

/**
* @param list<PluginInterface> $plugins
Expand All @@ -30,9 +32,10 @@ public function __construct(
private readonly Sender $sender,
private readonly array $plugins,
int $sendSampleFutureLimit,
private readonly int $concurrentRequestLimit,
) {
$this->init();
$this->sendSampleFutureLimit = new LocalSemaphore($sendSampleFutureLimit);
$this->queue = new Queue($sendSampleFutureLimit);
}

private function init(): void {
Expand All @@ -42,39 +45,78 @@ private function init(): void {
}

public function process(): void {
$sample = [];
Future\await([
$this->runProducer(),
$this->runConsumer(),
]);
}

foreach (self::getLine() as $line) {
$isEndOfTrace = $line === '';
private function runProducer(): Future {
return async(function (): void {
$sample = [];

if (!$isEndOfTrace) {
$sample[] = $line;
}
foreach (self::getLine() as $line) {
$isEndOfTrace = $line === '';

if ($isEndOfTrace && count($sample) > 0) {
try {
$tags = self::extractTags($sample);
$samplePrepared = self::prepareSample($sample);
self::checkSample($samplePrepared);
} catch (Throwable $e) {
echo $e->getMessage() . PHP_EOL;
var_dump($sample);
$sample = [];
continue;
if (!$isEndOfTrace) {
$sample[] = $line;
}

foreach ($this->plugins as $plugin) {
[$tags, $samplePrepared] = $plugin->process($tags, $samplePrepared);
if ($isEndOfTrace && count($sample) > 0) {
try {
try {
$tags = self::extractTags($sample);
$samplePrepared = self::prepareSample($sample);
self::checkSample($samplePrepared);
} catch (Throwable $e) {
echo $e->getMessage() . PHP_EOL;
var_dump($sample);
continue;
}

foreach ($this->plugins as $plugin) {
[$tags, $samplePrepared] = $plugin->process($tags, $samplePrepared);
}
$key = self::stringifyTrace($samplePrepared);
$this->groupTrace($tags, $key);

$currentTime = time();

if ($currentTime < $this->tsEnd && $this->countResults() < $this->batchLimit) {
continue;
}

foreach ($this->results as $tagSerialized => $results) {
$this->queue->push(new Sample($this->tsStart, $currentTime, $results, unserialize($tagSerialized)));
}

$this->init();
} finally {
$sample = [];
}
}
$key = self::stringifyTrace($samplePrepared);
$this->groupTrace($tags, $key);
$this->sendResults();
}

$sample = [];
$currentTime = time();
foreach ($this->results as $tagSerialized => $results) {
$this->queue->push(new Sample($this->tsStart, $currentTime, $results, unserialize($tagSerialized)));
}
}

$this->sendResults(true);
$this->queue->complete();
});
}

private function runConsumer(): Future {
return async(function (): void {
$this
->queue
->pipe()
->unordered()
->concurrent($this->concurrentRequestLimit)
->forEach(function (Sample $sample): void {
$this->sender->sendSample($sample);
});
});
}

/**
Expand Down Expand Up @@ -174,27 +216,6 @@ private function groupTrace(array $tags, string $key): void {
$this->results[$tagsKey][$key]++;
}

private function sendResults(bool $force = false): void {
$currentTime = time();
if (!$force && $currentTime < $this->tsEnd && $this->countResults() < $this->batchLimit) {
return;
}

$tsStart = $this->tsStart;
foreach ($this->results as $tagSerialized => $results) {
$futureLock = $this->sendSampleFutureLimit->acquire();
$this
->sender
->sendSample($tsStart, $currentTime, $results, unserialize($tagSerialized))
->finally(static function () use ($futureLock): void {
$futureLock->release();
})
;
}

$this->init();
}

private function countResults(): int {
$count = 0;
foreach ($this->results as $tagResuts) {
Expand Down
20 changes: 20 additions & 0 deletions app/Sample.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Zoon\PyroSpy;

final class Sample {

/**
* @param array<string, int> $samples
* @param array<string, string> $tags
*/
public function __construct(
public readonly int $fromTs,
public readonly int $toTs,
public readonly array $samples,
public readonly array $tags,
) {
}
}
55 changes: 18 additions & 37 deletions app/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@

namespace Zoon\PyroSpy;

use Amp\Future;
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Amp\Sync\LocalSemaphore;
use function Amp\async;
use function Amp\delay;

final class Sender {

private readonly HttpClient $client;
private readonly LocalSemaphore $concurrentRequestLimit;

/**
* @param array<string, string> $tags
Expand All @@ -23,47 +18,33 @@ public function __construct(
private readonly string $appName,
private readonly int $rateHz,
private readonly array $tags,
int $concurrentRequestLimit,
) {
$this->client = (new HttpClientBuilder())
->retry(0)
->followRedirects(0)
->build()
;
$this->concurrentRequestLimit = new LocalSemaphore($concurrentRequestLimit);
}

/**
* @param array<string, int> $samples
* @param array<string, string> $tags
* @return Future<bool>
*/
public function sendSample(int $fromTs, int $toTs, array $samples, array $tags): Future {
return async(function () use ($fromTs, $toTs, $samples, $tags) {
$lock = $this->concurrentRequestLimit->acquire();
try {
$url = $this->getUrl($tags, $fromTs, $toTs);
try {
$request = new Request($url, 'POST', self::prepareBody($samples));
$request->setTcpConnectTimeout(5 * 60);
$request->setTlsHandshakeTimeout(5 * 60);
$request->setTransferTimeout(60 * 60);
$request->setInactivityTimeout(60 * 60);
$response = $this->client->request($request);
if ($response->getStatus() === 200) {
return true;
} else {
printf("\nerror on request to url '%s', status code: %s", $url, $response->getStatus());
return false;
}
} catch (\Throwable $exception) {
printf("\nerror on request to url '%s', exception message: %s", $url, $exception->getMessage());
return false;
}
} finally {
$lock->release();
public function sendSample(Sample $sample): bool {
$url = $this->getUrl($sample->tags, $sample->fromTs, $sample->toTs);
try {
$request = new Request($url, 'POST', self::prepareBody($sample->samples));
$request->setTcpConnectTimeout(5 * 60);
$request->setTlsHandshakeTimeout(5 * 60);
$request->setTransferTimeout(60 * 60);
$request->setInactivityTimeout(60 * 60);
$response = $this->client->request($request);
if ($response->getStatus() === 200) {
return true;
} else {
printf("\nerror on request to url '%s', status code: %s", $url, $response->getStatus());
return false;
}
});
} catch (\Throwable $exception) {
printf("\nerror on request to url '%s', exception message: %s", $url, $exception->getMessage());
return false;
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"amphp/amp": "^3.0",
"revolt/event-loop": "^1.0",
"amphp/http-client": "^5",
"amphp/sync": "^2.0",
"amphp/byte-stream": "^2.0",
"symfony/console": "^5|^6"
"symfony/console": "^5|^6",
"amphp/pipeline": "^1.0"
},
"autoload": {
"psr-4": {
Expand Down

0 comments on commit 0ebaadd

Please sign in to comment.