Skip to content

Commit

Permalink
Refactor TaskPool and clean up mess in Client a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
donhardman committed Oct 24, 2023
1 parent 6e5e666 commit 63b7763
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 82 deletions.
23 changes: 8 additions & 15 deletions src/ManticoreSearch/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ class Client {
/** @var string $url */
protected string $url;

/** @var string $path */
protected string $path;

/** @var string $header */
protected string $header;

/** @var string $buddyVersion */
protected string $buddyVersion;

Expand All @@ -49,22 +43,18 @@ class Client {
* Initialize the Client that will use provided
* @param ?Response $responseBuilder
* @param ?string $url
* @param Endpoint $endpointBundle
* @return void
*/
public function __construct(
protected ?Response $responseBuilder = null,
?string $url = null,
Endpoint $endpointBundle = Endpoint::Sql
?string $url = null
) {
// If no url passed, set default one
if (!$url) {
$url = static::DEFAULT_URL;
}
$this->path = $endpointBundle->value;
$this->setServerUrl($url);
$this->buddyVersion = Buddy::getVersion();
$this->header = static::CONTENT_TYPE_HEADER;
}

/**
Expand Down Expand Up @@ -95,14 +85,12 @@ public function setServerUrl($url): static {
* Send the request where request represents the SQL query to be send
* @param string $request
* @param ?string $path
* @param ?string $header
* @param bool $disableAgentHeader
* @return Response
*/
public function sendRequest(
string $request,
?string $path = null,
?string $header = null,
bool $disableAgentHeader = false,
): Response {
$t = microtime(true);
Expand All @@ -112,15 +100,20 @@ public function sendRequest(
if ($request === '') {
throw new ManticoreSearchClientError('Empty request passed');
}
$path ??= $this->path;
if (!$path) {
$path = Endpoint::Sql->value;
}

if (str_ends_with($path, 'bulk')) {
$header = "Content-Type: application/x-ndjson\n";
}
// We urlencode all the requests to the /sql endpoint
if (str_starts_with($path, 'sql')) {
$request = 'query=' . urlencode($request);
}
$fullReqUrl = "{$this->url}/$path";
$agentHeader = $disableAgentHeader ? '' : "User-Agent: Manticore Buddy/{$this->buddyVersion}\n";
$header = ($header ?? "Content-Type: application/x-www-form-urlencoded\n");
$header ??= "Content-Type: application/x-www-form-urlencoded\n";
$opts = [
'http' => [
'method' => 'POST',
Expand Down
14 changes: 7 additions & 7 deletions src/Network/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,19 @@ protected function parseOrFail(array $payload): static {
// Checking if request format and endpoint are supported
/** @var array{path:string,query?:string} $urlInfo */
$urlInfo = parse_url($payload['message']['path_query']);
$this->path = ltrim($urlInfo['path'], '/');
if ($this->path === 'sql' && isset($urlInfo['query'])) {
$path = ltrim($urlInfo['path'], '/');
if ($path === 'sql' && isset($urlInfo['query'])) {
// We need to keep the query parameters part in the sql queries
// as it's required for the following requests to Manticore
$this->path .= '?' . $urlInfo['query'];
$path .= '?' . $urlInfo['query'];
}
if (str_contains($this->path, '/_doc/') || str_contains($this->path, '/_create/')
|| str_ends_with($this->path, '/_doc') || str_ends_with($this->path, '/_create')) {
if (str_contains($path, '/_doc/') || str_contains($path, '/_create/')
|| str_ends_with($path, '/_doc') || str_ends_with($path, '/_create')) {
// We don't differentiate elastic-like insert and replace queries here
// since this is irrelevant for the following Buddy processing logic
$endpointBundle = ManticoreEndpoint::Insert;
} else {
$endpointBundle = match ($this->path) {
$endpointBundle = match ($path) {
'bulk', '_bulk' => ManticoreEndpoint::Bulk,
'cli' => ManticoreEndpoint::Cli,
'cli_json' => ManticoreEndpoint::CliJson,
Expand All @@ -180,7 +180,7 @@ protected function parseOrFail(array $payload): static {
'unknown sql request' => RequestFormat::SQL,
default => throw new InvalidNetworkRequestError("Do not know how to handle '{$payload['type']}' type"),
};

$this->path = $path;
$this->format = $format;
$this->endpointBundle = $endpointBundle;
$this->payload = static::removeComments($payload['message']['body']);
Expand Down
45 changes: 4 additions & 41 deletions src/Task/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ final class Task {
protected GenericError $error;
protected TaskResult $result;

// Extended properties for make things simpler
protected string $host = '';
protected string $body = '';

/** @var ?Settings $settings */
protected static ?Settings $settings = null;

Expand Down Expand Up @@ -121,8 +117,9 @@ public function run(): static {
$this->processCallbacks('run');
$this->status = TaskStatus::Running;
$this->future = go(
function (Closure $fn, array $argv): void {
function (): void {
try {
[$fn, $argv] = $this->argv;
$this->result = $fn(...$argv);
} catch (Throwable $t) {
[$errorClass, $errorMessage] = [$t::class, $t->getMessage()];
Expand All @@ -133,8 +130,9 @@ function (Closure $fn, array $argv): void {
$this->error = $e;
} finally {
$this->status = TaskStatus::Finished;
$this->processCallbacks();
}
}, ...$this->argv
}
);

if (!$this->future) {
Expand Down Expand Up @@ -244,39 +242,4 @@ public function getResult(): TaskResult {

return $this->result;
}

/**
* Set host that we received from the HTTP header – Host
* @param string $host
* @return static
*/
public function setHost(string $host): static {
$this->host = $host;
return $this;
}

/**
* Get host property
* @return string
*/
public function getHost(): string {
return $this->host;
}

// Now setter and getter for body property
/**
* @param string $body
* return static
*/
public function setBody(string $body): static {
$this->body = $body;
return $this;
}

/**
* @return string
*/
public function getBody(): string {
return $this->body;
}
}
64 changes: 45 additions & 19 deletions src/Task/TaskPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,74 @@

namespace Manticoresearch\Buddy\Core\Task;

use Closure;
use RuntimeException;
use Swoole\Table;

/**
* Simple container for running tasks
*/
final class TaskPool {
/** @var array<int,Task> */
protected static array $pool = [];
/** @var Table */
protected static Table $pool;

/**
* Get current pool
* @return Table
*/
public static function pool(): Table {
if (!isset(static::$pool)) {
static::$pool = new Table(1024);
static::$pool->column('id', Table::TYPE_STRING, 24);
static::$pool->column('host', Table::TYPE_STRING, 24);
static::$pool->column('body', Table::TYPE_STRING, 64);
static::$pool->create();
}
return static::$pool;
}

/**
* Add new task to the pool, so we can understand what is running now
* @param Task $task
* @return void
* @param string $id
* @param string $body
* @param string $host
* @return Closure
*/
public static function add(Task $task): void {
$taskId = $task->getId();
if (isset(static::$pool[$taskId])) {
throw new RuntimeException("Task {$taskId} already exists");
public static function add(string $id, string $body, string $host = 'localhost'): Closure {
if (static::pool()->exists($id)) {
throw new RuntimeException("Task {$id} already exists");
}
static::$pool[$taskId] = $task;

static::pool()->set($id, [
'id' => substr($id, 0, 24),
'host' => substr($host, 0, 24),
'body' => substr($body, 0, 64),
]);

return static function () use ($id) {
static::remove($id);
};
}

/**
* Remove the specified task from the pool, so we will not count it when it's done
* @param Task $task
* @param string $id
* @return void
*/
public static function remove(Task $task): void {
$taskId = $task->getId();
if (!isset(static::$pool[$taskId])) {
throw new RuntimeException("Task {$taskId} does not exist");
public static function remove(string $id): void {
if (!static::pool()->exists($id)) {
throw new RuntimeException("Task {$id} does not exist");
}
unset(static::$pool[$taskId]);
static::pool()->delete($id);
}

/**
* Get all active tasks in the pool
*
* @return array<int,Task>
* @return Table
*/
public static function getList(): array {
return static::$pool;
public static function getList(): Table {
return static::pool();
}

/**
Expand All @@ -61,6 +87,6 @@ public static function getList(): array {
* @return int
*/
public static function getCount(): int {
return sizeof(static::$pool);
return static::pool()->count();
}
}

0 comments on commit 63b7763

Please sign in to comment.