Skip to content

Commit

Permalink
Feature: use simdjson, jchash for shard detection and tune core proce…
Browse files Browse the repository at this point in the history
…ss logic with WaitGroup
  • Loading branch information
donhardman committed Jan 16, 2025
1 parent 7e8be60 commit 0151554
Show file tree
Hide file tree
Showing 85 changed files with 68 additions and 64 deletions.
2 changes: 1 addition & 1 deletion src/Lib/Metric.php
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ protected function getVolumeMetrics(): array {
}
// Decode config and get all path for size calculation
/** @var array{indexes?:array<array{path:string}>} $jsonConfig */
$jsonConfig = json_decode($jsonContent, true);
$jsonConfig = simdjson_decode($jsonContent, true);
$indexes = $jsonConfig['indexes'] ?? [];
$paths = array_column($indexes, 'path');
$size = 0;
Expand Down
28 changes: 16 additions & 12 deletions src/Network/EventHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
use Manticoresearch\Buddy\Core\Task\TaskResult;
use Manticoresearch\Buddy\Core\Tool\Buddy;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\WaitGroup;
use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use Throwable;
Expand Down Expand Up @@ -63,18 +63,23 @@ public static function request(SwooleRequest $request, SwooleResponse $response)
$startTime = hrtime(true);
$requestId = $request->header['Request-ID'] ?? uniqid(more_entropy: true);
$body = $request->rawContent() ?: '';
$channel = new Channel(1);
$waitGroup = new WaitGroup();
$waitGroup->add(1);
$result = '';
Coroutine::create(
static function () use ($requestId, $body, $channel, $startTime) {
Buddy::debug("[$requestId] request data: $body");
$result = (string)static::process($requestId, $body);
Buddy::debug("[$requestId] response data: $result");
Buddy::debug("[$requestId] response time: " . round((hrtime(true) - $startTime) / 1e6, 3) . ' ms');
$channel->push($result);
static function () use ($requestId, $body, $startTime, $waitGroup, &$result) {
try {
Buddy::debug("[$requestId] request data: $body");
$result = (string)static::process($requestId, $body);
Buddy::debug("[$requestId] response data: $result");
Buddy::debug("[$requestId] response time: " . round((hrtime(true) - $startTime) / 1e6, 3) . ' ms');
} finally {
$waitGroup->done();
}
}
);
/** @var string $result */
$result = $channel->pop();

$waitGroup->wait();
$response->header('Content-Type', 'application/json');
$response->status(200);
$response->end($result);
Expand All @@ -92,7 +97,6 @@ public static function process(string $id, string $payload): Response {
$startTime = hrtime(true);
$request = Request::fromString($payload, $id);
$handler = QueryProcessor::process($request)->run();

// In case deferred we return the ID of the task not the request
if ($handler->isDeferred()) {
$doneFn = TaskPool::add($id, $request->payload);
Expand Down Expand Up @@ -124,7 +128,7 @@ public static function process(string $id, string $payload): Response {
$originalErrorBody = $request->errorBody;
} else {
/** @var array{error?:array{message:string,body?:array{error:string}}} $payloadInfo */
$payloadInfo = (array)json_decode($payload, true);
$payloadInfo = (array)simdjson_decode($payload, true);
$originalError = $payloadInfo['error']['message'] ?? '';
$originalErrorBody = $payloadInfo['error']['body'] ?? [];
}
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/Autocomplete/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected static function fromJsonRequest(Request $request): static {
preserve?: int
}
} $payload */
$payload = json_decode($request->payload, true);
$payload = simdjson_decode($request->payload, true);
if (!isset($payload['query']) || !is_string($payload['query'])) {
throw QueryParseError::create('Failed to parse query: make sure you have query and it is a string');
}
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/DistributedInsert/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected function processBatch(
'cluster' => $cluster,
];

$shard = hexdec(substr(md5($idStr), 0, 8)) % $shardCount;
$shard = jchash($idStr, $shardCount);
$info = $shards[$shard];
$shardName = $info['name'];
$this->assignTable($struct, $cluster, $shardName);
Expand Down
10 changes: 5 additions & 5 deletions src/Plugin/DistributedInsert/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,14 @@ protected static function parseBulkPayload(Request $request): array {
continue;
}

/** @var Struct<int|string,array<string,mixed>> $struct */
$struct = Struct::fromJson($rows[$i]);
[$cluster, $table] = static::processBulkRow($struct, $batch, $tableMap, $cluster, $table);
[$cluster, $table] = static::processBulkRow($rows[$i], $batch, $tableMap, $cluster, $table);
}
/** @var Batch $batch */
return $batch;
}

/**
* @param Struct<int|string,array<string,mixed>> $struct
* @param string $row
* @param Batch &$batch
* @param array<string,array{0:string,1:string}> &$tableMap
* @param string $cluster
Expand All @@ -161,12 +159,14 @@ protected static function parseBulkPayload(Request $request): array {
* @throws QueryParseError
*/
protected static function processBulkRow(
Struct $struct,
string $row,
array &$batch,
array &$tableMap,
string $cluster,
string $table
): array {
/** @var Struct<int|string,array<string,mixed>> $struct */
$struct = Struct::fromJson($row);
if (isset($struct['index']['_index'])) { // _bulk
/** @var string $table */
$table = $struct['index']['_index'];
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/EmulateElastic/AddAliasHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public function __construct(public Payload $payload) {
public function run(): Task {
$taskFn = static function (Payload $payload, HTTPClient $manticoreClient): TaskResult {
/** @var array{actions:array<mixed>} */
$request = json_decode($payload->body, true);
$request = simdjson_decode($payload->body, true);
if (!is_array($request)) {
throw new \Exception('Cannot parse request');
}
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/EmulateElastic/AddTemplateHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function __construct(public Payload $payload) {
public function run(): Task {
$taskFn = static function (Payload $payload, HTTPClient $manticoreClient): TaskResult {
/** @var array<string,mixed> */
$request = json_decode($payload->body, true);
$request = simdjson_decode($payload->body, true);
if (!is_array($request) || !isset($request['index_patterns'])) {
throw new \Exception('Cannot parse request');
}
Expand Down
4 changes: 2 additions & 2 deletions src/Plugin/EmulateElastic/CatHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public function run(): Task {
$catInfo[] = [
'name' => $entityInfo['name'],
'order' => 0,
'index_patterns' => json_decode($entityInfo['patterns'], true),
] + json_decode($entityInfo['content'], true);
'index_patterns' => simdjson_decode($entityInfo['patterns'], true),
] + simdjson_decode($entityInfo['content'], true);
}

return TaskResult::raw($catInfo);
Expand Down
Empty file modified src/Plugin/EmulateElastic/ClusterKibanaHandler.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/CountInfoKibanaHandler.php
100755 → 100644
Empty file.
2 changes: 1 addition & 1 deletion src/Plugin/EmulateElastic/FieldCapsHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public function run(): Task {
$requestTables = self::getRequestTables($payload, $manticoreClient);

/** @var array<string,mixed> $request */
$request = json_decode($payload->body, true);
$request = simdjson_decode($payload->body, true);
$isEmptyRequest = !$request;
if (!$isEmptyRequest && !isset($request['fields'])) {
throw new \Exception('Cannot parse request');
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/EmulateElastic/FindEntityHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public function run(): Task {
'_index' => $entityIndex,
'_primary_term' => 1,
'_seq_no' => 0,
'_source' => json_decode($queryResult[0]['data'][0]['_source'], true),
'_source' => simdjson_decode($queryResult[0]['data'][0]['_source'], true),
'_type' => '_doc',
'_version' => 1,
'found' => true,
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/EmulateElastic/GetEntityHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static function get(string $entityId, string $entityIndex, HTTPClient $ma
'_index' => $entityIndex,
'_primary_term' => 1,
'_seq_no' => 0,
'_source' => (array)json_decode($queryResult[0]['data'][0]['_source'], true),
'_source' => (array)simdjson_decode($queryResult[0]['data'][0]['_source'], true),
'_type' => '_doc',
'_version' => 1,
'found' => true,
Expand Down
8 changes: 4 additions & 4 deletions src/Plugin/EmulateElastic/ImportKibanaHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public function run(): Task {
*/
protected static function importFromQuery(string $query, HTTPClient $manticoreClient): int {
// Checking if the entity can be imported
$queryEntity = (array)json_decode($query, true);
$queryEntity = (array)simdjson_decode($query, true);
if (!array_key_exists('type', $queryEntity) || !in_array($queryEntity['type'], self::IMPORT_ENTITY_TYPES)) {
return 0;
}
Expand Down Expand Up @@ -168,7 +168,7 @@ public static function checkEntityPatterns(
HTTPClient $manticoreClient
): string {
/** @var array{index-pattern?:array{title:string},attributes:array{title:string}} $pattern */
$pattern = json_decode($patternSource, true);
$pattern = simdjson_decode($patternSource, true);
$indexName = isset($pattern['index-pattern'])
? $pattern['index-pattern']['title'] : $pattern['attributes']['title'];
$query = 'SELECT _id, _source FROM ' . parent::ENTITY_TABLE . " WHERE _type='{$indexType}'";
Expand All @@ -179,7 +179,7 @@ public static function checkEntityPatterns(
}
foreach ($queryResult[0]['data'] as $patternInfo) {
/** @var array{index-pattern?:array{title:string},attributes:array{title:string}} $savedPattern */
$savedPattern = json_decode($patternInfo['_source'], true);
$savedPattern = simdjson_decode($patternInfo['_source'], true);
$savedIndexName = isset($savedPattern['index-pattern'])
? $savedPattern['index-pattern']['title'] : $savedPattern['attributes']['title'];
if ($savedIndexName === $indexName) {
Expand Down Expand Up @@ -215,7 +215,7 @@ public static function updateEntityPatterns(
}
foreach ($queryResult[0]['data'] as $entityInfo) {
/** @var array{references:array{0:array{id:string}}} $entitySource */
$entitySource = json_decode($entityInfo['_source'], true);
$entitySource = simdjson_decode($entityInfo['_source'], true);
if ($entitySource['references'][0]['id'] !== $importedId) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/EmulateElastic/InitKibanaHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public function run(): Task {
'aliases' => [
$alias => [],
],
] + json_decode($entity['_source'], true);
] + simdjson_decode($entity['_source'], true);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/EmulateElastic/InvalidSourceKibanaHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public function __construct(public Payload $payload) {
*/
public function run(): Task {
$taskFn = static function (Payload $payload, HTTPClient $manticoreClient): TaskResult {
$request = json_decode($payload->body, true);
$request = simdjson_decode($payload->body, true);
if (!is_array($request)) {
throw new Exception("Invalid request passed: {$payload->body}");
}
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/EmulateElastic/KibanaSearch/Handler.php
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public function __construct(public Payload $payload) {
*/
public function run(): Task {
$taskFn = static function (Payload $payload, Client $manticoreClient): TaskResult {
$request = json_decode($payload->body, true);
$request = simdjson_decode($payload->body, true);
if (!is_array($request)) {
throw new \Exception('Cannot parse Kibana request');
}
Expand Down
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/NodeSet.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/RequestNode/AggNode.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/RequestNode/BaseNode.php
100755 → 100644
Empty file.
Empty file.
Empty file.
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/RequestNode/ExprNode.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/RequestNode/Factory.php
100755 → 100644
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/RequestNode/Metric.php
100755 → 100644
Empty file.
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/RequestNode/Range.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/RequestNode/Term.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/RequestParser.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/Response.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/SphinxQLRequest.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/KibanaSearch/TableFieldInfo.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/ManagerSettingsKibanaHandler.php
100755 → 100644
Empty file.
Empty file modified src/Plugin/EmulateElastic/MetricKibanaHandler.php
100755 → 100644
Empty file.
4 changes: 2 additions & 2 deletions src/Plugin/EmulateElastic/MgetKibanaHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function __construct(public Payload $payload) {
public function run(): Task {
$taskFn = static function (Payload $payload, HTTPClient $manticoreClient): TaskResult {
/** @var array{docs:array<array<string,string>>} $payloadBody */
$payloadBody = json_decode($payload->body, true);
$payloadBody = simdjson_decode($payload->body, true);
$entityInfo = $payloadBody['docs'];
$getEntitiesCond = self::buildEntitiesCond($entityInfo);
$query = 'SELECT _id, _index, _source FROM `' . self::ENTITY_TABLE . "` WHERE {$getEntitiesCond}";
Expand All @@ -58,7 +58,7 @@ public function run(): Task {
'_index' => $entity['_index'],
'_primary_term' => 1,
'_seq_no' => 0,
'_source' => json_decode($entity['_source'], true),
'_source' => simdjson_decode($entity['_source'], true),
'_type' => '_doc',
'_version' => 1,
'found' => true,
Expand Down
Empty file modified src/Plugin/EmulateElastic/NodesInfoKibanaHandler.php
100755 → 100644
Empty file.
2 changes: 1 addition & 1 deletion src/Plugin/EmulateElastic/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static function fromRequest(Request $request): static {
* properties?:array<string,array{properties?:array<mixed>,type?:string,fields?:array<mixed>}>
* } $requestBody
*/
$requestBody = (array)json_decode($request->payload, true);
$requestBody = (array)simdjson_decode($request->payload, true);
if ($requestBody === [] || !isset($requestBody['properties'])) {
throw new Exception("Unvalid request body in {$request->path}: $request->payload");
}
Expand Down
Empty file modified src/Plugin/EmulateElastic/SettingsKibanaHandler.php
100755 → 100644
Empty file.
4 changes: 2 additions & 2 deletions src/Plugin/EmulateElastic/TableKibanaHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public function __construct(public Payload $payload) {
public function run(): Task {
$taskFn = static function (Payload $payload, HTTPClient $manticoreClient): TaskResult {
/** @var array{aggs:array<string,mixed>} $payloadBody */
$payloadBody = json_decode($payload->body, true);
$payloadBody = simdjson_decode($payload->body, true);
$reqType = self::detectRequestType($payloadBody);
$aggNames = [];
if ($reqType === self::AGG_REQUEST_TYPE) {
Expand Down Expand Up @@ -109,7 +109,7 @@ protected static function postprocessQueryResult(Struct $queryResult, array $sea
'_seq_no' => 1,
'_type' => '_doc',
];
$hit['_source'] = json_decode($hit['_source'], true);
$hit['_source'] = simdjson_decode($hit['_source'], true);
if (!array_key_exists('filter', $searchConds)) {
continue;
}
Expand Down
Empty file modified src/Plugin/EmulateElastic/TelemetryKibanaHandler.php
100755 → 100644
Empty file.
10 changes: 5 additions & 5 deletions src/Plugin/EmulateElastic/UpdateEntityHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function run(): Task {
}
if (!$queryResult[0]['data']) {
/** @var array{doc?:array<mixed>} $payloadObj */
$payloadObj = json_decode($payload->body, true);
$payloadObj = simdjson_decode($payload->body, true);
$indexType = explode(':', $entityId)[0];
$entitySource = array_key_exists('doc', $payloadObj) ? $payloadObj['doc'] : $payloadObj;
AddEntityHandler::add(
Expand Down Expand Up @@ -86,18 +86,18 @@ public static function update(
HTTPClient $manticoreClient
): void {
/** @var array{doc?:array<mixed>} $sourceObj */
$sourceObj = json_decode($source, true);
$sourceObj = simdjson_decode($source, true);
if (!$sourceObj) {
$source = str_replace('\\"', '\\\\"', $source);
/** @var array{doc?:array<mixed>} $sourceObj */
$sourceObj = json_decode($source, true);
$sourceObj = simdjson_decode($source, true);
}
if (array_key_exists('doc', $sourceObj)) {
$sourceObj = $sourceObj['doc'];
}
if ($prevSource) {
/** @var array<int|string,mixed> $prevSourceObj */
$prevSourceObj = json_decode($prevSource, true);
$prevSourceObj = simdjson_decode($prevSource, true);
} else {
$prevSourceObj = [];
}
Expand All @@ -123,7 +123,7 @@ public static function update(
* @throws \Exception
*/
protected static function buildUpdateData(string $source): array {
$sourceData = (array)json_decode($source, true);
$sourceData = (array)simdjson_decode($source, true);
$entityKey = array_key_first($sourceData);
if ($entityKey === null) {
throw new \Exception('Unknown error on Kibana entity update');
Expand Down
Empty file modified src/Plugin/EmulateElastic/XpackInfoKibanaHandler.php
100755 → 100644
Empty file.
2 changes: 1 addition & 1 deletion src/Plugin/Fuzzy/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static function fromRequest(Request $request): static {
*/
protected static function fromJsonRequest(Request $request): static {
/** @var array{index:string,table?:string,query:array{match:array{'*'?:string}},options:array{fuzzy?:bool,distance?:int,layouts?:string,preserve?:bool}} $payload */
$payload = json_decode($request->payload, true);
$payload = simdjson_decode($request->payload, true);
$self = new static();
$self->path = $request->path;
$self->table = $payload['table'] ?? $payload['index'];
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/Insert/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected static function preprocessElasticLikeRequest(string &$path, string $pa
$path = 'insert';
$query = [
'table' => $table,
'doc' => (array)json_decode($payload, true),
'doc' => (array)simdjson_decode($payload, true),
];
if (isset($pathParts[2]) && $pathParts[2]) {
$query['id'] = (int)$pathParts[2];
Expand Down
4 changes: 2 additions & 2 deletions src/Plugin/Knn/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static function fromRequest(Request $request): static {
private static function parseHttpRequest(self $payload, Request $request): void {
$payload->select = [];

$parsedPayload = json_decode($request->payload, true);
$parsedPayload = simdjson_decode($request->payload, true);
if (!is_array($parsedPayload)) {
return;
}
Expand Down Expand Up @@ -120,7 +120,7 @@ private static function parseSqlRequest(self $payload): void {
*/
public static function hasMatch(Request $request): bool {
if ($request->endpointBundle === Endpoint::Search) {
$payload = json_decode($request->payload, true);
$payload = simdjson_decode($request->payload, true);
if (is_array($payload) && isset($payload['knn']['doc_id'])) {
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/Queue/StringFunctionsTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected function morphValuesByFieldType(
*/
private function prepareMvaField(string $fieldValue): string {
if (isset($fieldValue[0]) && $fieldValue[0] === '[') {
$fieldValue = json_decode($fieldValue, true);
$fieldValue = simdjson_decode($fieldValue, true);
}


Expand Down
6 changes: 3 additions & 3 deletions src/Plugin/Queue/Workers/Kafka/KafkaWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ public function __construct(
) {

/** @var array{group:string, broker:string, topic:string, batch:string } $attrs */
$attrs = json_decode($instance['attrs'], true);
$attrs = simdjson_decode($instance['attrs'], true);

$this->client = $client;
$this->consumerGroup = $attrs['group'];
$this->brokerList = $attrs['broker'];

$decodedMapping = json_decode($instance['custom_mapping'], true);
$decodedMapping = simdjson_decode($instance['custom_mapping'], true);
if ($decodedMapping === false) {
GenericError::throw(
'Custom mapping decoding error: '.json_last_error_msg()
Expand Down Expand Up @@ -199,7 +199,7 @@ public function processBatch(array $batch): bool {
private function mapMessages(array $batch): array {
$results = [];
foreach ($batch as $message) {
$parsedMessage = json_decode($message, true);
$parsedMessage = simdjson_decode($message, true);
if (is_array($parsedMessage)) {
$message = array_change_key_case($parsedMessage);
} else {
Expand Down
Loading

0 comments on commit 0151554

Please sign in to comment.