From d8c6a4082572ed28715c2e3f4a23a0325618bf4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Sun, 14 Jul 2024 13:32:00 +0200 Subject: [PATCH] Event loop improvements --- http_api.php | 126 ++------------- src/WordPress/AsyncHttp/Client.php | 179 +++++++++++++--------- src/WordPress/AsyncHttp/Request.php | 8 +- src/WordPress/AsyncHttp/Response.php | 6 +- src/WordPress/AsyncHttp/StreamWrapper.php | 40 +---- 5 files changed, 136 insertions(+), 223 deletions(-) diff --git a/http_api.php b/http_api.php index 32124329..b20fe4e0 100644 --- a/http_api.php +++ b/http_api.php @@ -11,125 +11,31 @@ require_once __DIR__ . '/src/WordPress/AsyncHttp/InflateStreamWrapperData.php'; $requests = [ - // new Request("https://playground.internal"), - // (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.1'), - // (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.0'), - (new Request("http://127.0.0.1:3000/")) //->set_http_version('1.0'), + new Request("https://playground.internal"), + (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.1'), + (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.0'), + (new Request("http://127.0.0.1:3000/"))->set_http_version('1.0'), ]; -// list($streams, $headers, $errors) = streams_send_http_requests($requests); -// print_r($streams); -// print_r($errors); // var_dump(streams_http_response_read_bytes($streams, 1024)); // Enqueuing another request here is instant and won't start the download yet. $client = new Client(); -$queue = $client->enqueue( $requests ); -var_dump($client->read_bytes($requests[0], 10, Client::READ_POLL_ANY)); -var_dump($client->read_bytes($requests[0], 1024, Client::READ_NON_BLOCKING)); -var_dump($client->read_bytes($requests[0], 1024, Client::READ_POLL_ANY)); -// var_dump($client->read_bytes($requests[0], 1024)); - -die(); - -// @TODO: handle wait_for_all_requested_bytes for more than content-length bytes -var_dump(stream_get_contents($requests[1]->get_response()->body_stream)); -// var_dump($client->read_bytes($requests[1], 359, [ -// 'mode' => 'poll_once', -// ])); -// var_dump($client->read_bytes($requests[1], 359, [ -// 'mode' => 'poll_once', -// ])); -// var_dump($client->read_bytes($requests[1], 359, [ -// 'mode' => 'poll_once', -// ])); -// @TODO: poll_once should eventully mark the request as finished -var_dump("----"); -var_dump($client->read_bytes($requests[2], 1024, [ - 'mode' => 'return', -])); -var_dump($client->read_bytes($requests[2], 1024, [ - 'mode' => 'poll_once', -])); -// var_dump($queue); -// var_dump($queue[0]); -// var_dump($client->read_bytes($requests[0], 1024, [ -// 'mode' => 'return', -// ])); -// var_dump(fread($queue[0]->get_body_stream(), 1)); -// var_dump(fread($queue[0]->get_body_stream(), 1)); -// var_dump(fread($queue[0]->get_body_stream(), 1)); -die(); -// var_dump($queue[0]); -var_dump($client->read_bytes($requests[0], 186, [ - 'mode' => 'return', -])); -var_dump($client->read_bytes($requests[0], 186, [ - 'mode' => 'return', -])); -// var_dump($queue[0]->get_status_code()); -// var_dump($queue[0]->get_headers()); - -// var_dump(stream_get_contents($queue[0]->response->body_stream)); -die(); -$client = new Client(); $client->set_progress_callback( function ( Request $request, $downloaded, $total ) { - echo "$request->url – Downloaded: $downloaded / $total\n"; + // echo "$request->url – Downloaded: $downloaded / $total\n"; } ); -$requests = [ - new Request("https://anglesharp.azurewebsites.net/Chunked") - // new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ), - // new Request( "https://downloads.wordpress.org/theme/pendant.zip" ), -]; -$queue = $client->enqueue( $requests ); -var_dump($queue[0]); -die(); -// Enqueuing another request here is instant and won't start the download yet. -//$streams2 = $client->enqueue( [ -// new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ), -//] ); +$client->enqueue( $requests ); -try { - $client->read_bytes($requests[0], 4096); - // var_dump(stream_get_contents($streams1[0])); -} catch (Exception $e) { - echo $e->getMessage(); -} -print_r($client); -print_r(stream_context_get_options($streams1[0])); -// Stream a single file, while streaming all the files -// file_put_contents( 'output-round1-0.zip', stream_get_contents( $streams1[0] ) ); -//file_put_contents( 'output-round1-1.zip', stream_get_contents( $streams1[1] ) ); -die(); -// Initiate more HTTPS requests -$streams3 = $client->enqueue( [ - new Request( "https://downloads.wordpress.org/plugin/akismet.4.1.12.zip" ), - new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ), - new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ), -] ); -// Download the rest of the files. Foreach() seems like downloading things -// sequentially, but we're actually streaming all the files in parallel. -$streams = array_merge( $streams2, $streams3 ); -foreach ( $streams as $k => $stream ) { - file_put_contents( 'output-round2-' . $k . '.zip', stream_get_contents( $stream ) ); +while(true) { + $request = $client->next_response_chunk(); + if(false === $request) { + break; + } + echo "GOT DATA CHUNK ON REQUEST $request->id:\n"; + echo $request->get_response()->consume_buffer(1024); + echo "----------------\n\n"; } -echo "Done! :)"; - -// ---------------------------- -// -// Previous explorations: - -// Non-blocking parallel processing – the fastest method. -//while ( $results = sockets_http_response_read_bytes( $streams, 8096 ) ) { -// foreach ( $results as $k => $chunk ) { -// file_put_contents( 'output' . $k . '.zip', $chunk, FILE_APPEND ); -// } -//} - -// Blocking sequential processing – the slowest method. -//foreach ( $streams as $k => $stream ) { -// stream_set_blocking( $stream, 1 ); -// file_put_contents( 'output' . $k . '.zip', stream_get_contents( $stream ) ); -//} +// $client->wait_for_headers($requests[3]); +// var_dump($requests[3]->get_response()->get_headers()); diff --git a/src/WordPress/AsyncHttp/Client.php b/src/WordPress/AsyncHttp/Client.php index 5b7998b2..e3a99967 100644 --- a/src/WordPress/AsyncHttp/Client.php +++ b/src/WordPress/AsyncHttp/Client.php @@ -87,10 +87,13 @@ * **Supports custom request headers and body** */ class Client { - protected $concurrency = 2; - protected $requests; - protected $onProgress; - protected $is_processing_queue = false; + + const STREAM_SELECT_READ = 1; + const STREAM_SELECT_WRITE = 2; + + const READ_NON_BLOCKING = 'READ_NON_BLOCKING'; + const READ_POLL_ANY = 'READ_POLL_ANY'; + const READ_POLL_ALL = 'READ_POLL_ALL'; /** * Microsecond is 1 millionth of a second. @@ -104,6 +107,11 @@ class Client { */ const NONBLOCKING_TIMEOUT_MICROSECONDS = 0.05 * self::MICROSECONDS_TO_SECONDS; + protected $concurrency = 2; + protected $requests; + protected $onProgress; + protected $is_processing_queue = false; + public function __construct() { $this->requests = []; $this->onProgress = function () { @@ -137,57 +145,18 @@ public function set_progress_callback( $onProgress ) { * streams is read from. * * @param Request|Request[] $requests The HTTP request(s) to enqueue. Can be a single request or an array of requests. - * - * @return Response[]|Response|array The enqueued streams. */ public function enqueue( $requests ) { if ( ! is_array( $requests ) ) { - return $this->enqueue_request( $requests ); + $this->requests[] = $requests; + return; } - $enqueued_streams = array(); foreach ( $requests as $request ) { - $enqueued_streams[] = $this->enqueue_request( $request ); + $this->requests[] = $request; } - - return $enqueued_streams; - } - - /** - * Returns the response stream associated with the given Request object. - * Reading from that stream also runs this Client's event loop. - * - * @param Request $request - * - * @return resource - */ - public function get_stream( $request ) { - throw new Exception('Not implemented yet'); - // if ( ! isset( $this->requests[ $request ] ) ) { - // $this->enqueue_request( $request ); - // } - - // if ( $this->queue_needs_processing ) { - // $this->process_queue(); - // } - - // StreamWrapper::create_resource( - // new StreamData($request, $client) - // ) } - /** - * @param \WordPress\AsyncHttp\Request $request - */ - protected function enqueue_request( $request ) { - $this->requests[] = $request; - return $request->get_response(); - } - - - const READ_NON_BLOCKING = 'READ_NON_BLOCKING'; - const READ_POLL_ANY = 'READ_POLL_ANY'; - const READ_POLL_ALL = 'READ_POLL_ALL'; /** * Reads $length bytes from the given request while also running * non-blocking event loop operations. @@ -218,21 +187,87 @@ public function read_bytes( $request, $length, $mode = self::READ_NON_BLOCKING ) ) { break; } - } while ($this->event_loop_pass()); + } while ($this->event_loop_tick()); return $buffered; } - public function event_loop_pass() + public function next_response_chunk() { - if(count($this->get_concurrent_requests()) === 0) { + do { + foreach($this->requests as $request) { + if ( strlen($request->get_response()->buffer) > 0 ) { + return $request; + } + } + } while($this->event_loop_tick()); + + return false; + } + + public function wait_for_headers( $request ) + { + if(!in_array($request, $this->requests, true)) { + trigger_error('Request not found in the client', E_USER_WARNING); return false; } - echo "event_loop_pass\n"; - foreach($this->requests as $request) { - echo "request state: $request->state\n"; + + do { + if ($request->get_response()->get_headers()) { + return true; + } + } while ($this->event_loop_tick() && $request->state !== Request::STATE_FAILED); + + return false; + } + + public function wait_for_response_body_stream( $request ) + { + if(!in_array($request, $this->requests, true)) { + trigger_error('Request not found in the client', E_USER_WARNING); + return false; + } + + do { + if ($request->get_response()->decoded_response_stream) { + return true; + } + } while ($this->event_loop_tick() && $request->state !== Request::STATE_FAILED); + + return false; + } + + /** + * Returns the response stream associated with the given Request object. + * Reading from that stream also runs this Client's event loop. + * + * @param Request $request + * + * @return resource|bool + */ + // public function get_response_stream( $request ) { + // if(!in_array($request, $this->requests, true)) { + // trigger_error('Request not found in the client', E_USER_WARNING); + // return false; + // } + + // if( + // $request->state !== Request::STATE_RECEIVING_BODY && + // $request->state !== Request::STATE_FINISHED + // ) { + // trigger_error('Request is not in a state where the response stream is available', E_USER_WARNING); + // return false; + // } + + // return $request->get_response()->event_loop_decoded_response_stream; + // } + + public function event_loop_tick() + { + if(count($this->get_concurrent_requests()) === 0) { + return false; } - sleep(1); + static::open_nonblocking_http_sockets( $this->get_concurrent_requests( Request::STATE_ENQUEUED ) ); @@ -275,16 +310,14 @@ protected function get_concurrent_requests($states=null) Request::STATE_RECEIVING_BODY, Request::STATE_RECEIVED, ]); + $available_slots = $this->concurrency - count($processed_requests); $enqueued_requests = $this->get_requests(Request::STATE_ENQUEUED); - $backfill_enqueued_nb = min( - count($enqueued_requests), - $this->concurrency - count($processed_requests) - ); - - for($i = 0; $i < $backfill_enqueued_nb; $i++) { + for($i = 0; $i < $available_slots; $i++) { + if(!isset($enqueued_requests[$i])) { + break; + } $processed_requests[] = $enqueued_requests[$i]; } - if($states !== null) { $processed_requests = static::filter_requests($processed_requests, $states); } @@ -385,7 +418,7 @@ static private function enable_crypto(array $requests) // for now and try again on the next event loop pass. continue; } - // Headers sent! Let's promote the request to the next state. + // SSL connection established, let's send the headers. $request->state = Request::STATE_WILL_SEND_HEADERS; } } @@ -476,17 +509,13 @@ static private function receive_response_headers( $requests ) { $response->statusMessage = $parsed['status']['message']; $response->protocol = $parsed['status']['protocol']; - $content_length = $response->get_header('content-length'); - $transfer_encoding = $response->get_header('transfer-encoding'); - // If we're expecting a body, let's start receiving it. - if( - $transfer_encoding === 'chunked' || - ($content_length !== null && (int) $content_length > 0) - ) { - $request->state = Request::STATE_RECEIVING_BODY; - } else { + // If we're being redirected, we don't need to wait for the body. + if($response->statusCode >= 300 && $response->statusCode < 400) { $request->state = Request::STATE_RECEIVED; + break; } + + $request->state = Request::STATE_RECEIVING_BODY; break; } } @@ -498,6 +527,10 @@ static private function receive_response_headers( $requests ) { * @param array $requests An array of requests. */ private function receive_response_body( $requests ) { + // @TODO: Assume body is fully received when either + // * Content-Length is reached + // * The last chunk in Transfer-Encoding: chunked is received + // * The connection is closed foreach (static::stream_select($requests, static::STREAM_SELECT_READ) as $request) { $response = $request->get_response(); if (!$response->decoded_response_stream) { @@ -700,17 +733,15 @@ static private function filter_requests( array $requests, $states ) { $states = [$states]; } $results = []; - foreach($requests as $k => $request) { + foreach($requests as $request) { if(in_array($request->state, $states)) { - $results[$k] = $request; + $results[] = $request; } } return $results; } - const STREAM_SELECT_READ = 1; - const STREAM_SELECT_WRITE = 2; static private function stream_select( $requests, $mode ) { if(empty($requests)) { return []; diff --git a/src/WordPress/AsyncHttp/Request.php b/src/WordPress/AsyncHttp/Request.php index 5d3a4c12..d27c4f20 100644 --- a/src/WordPress/AsyncHttp/Request.php +++ b/src/WordPress/AsyncHttp/Request.php @@ -15,6 +15,10 @@ class Request { const STATE_FAILED = 'STATE_FAILED'; const STATE_FINISHED = 'STATE_FINISHED'; + static private $last_id; + + public $id; + public $state = self::STATE_ENQUEUED; public $url; @@ -33,6 +37,7 @@ class Request { * @param string $url */ public function __construct( string $url, $method='GET', $headers=[], $body_stream=null, $http_version='1.1' ) { + $this->id = ++self::$last_id; $this->url = $url; $this->is_ssl = strpos( $url, 'https://' ) === 0; @@ -100,9 +105,6 @@ public function set_error($error) $this->error = $error; $this->state = self::STATE_FAILED; - $this->response->error = $error; - $this->response->state = self::STATE_FAILED; - if($this->http_socket) { fclose($this->http_socket); $this->http_socket = null; diff --git a/src/WordPress/AsyncHttp/Response.php b/src/WordPress/AsyncHttp/Response.php index 824cfc0b..1ce7914b 100644 --- a/src/WordPress/AsyncHttp/Response.php +++ b/src/WordPress/AsyncHttp/Response.php @@ -42,7 +42,7 @@ public function get_protocol() } public function get_header( $name ) { - if($this->headers === null) { + if(false === $this->get_headers()) { return false; } @@ -51,6 +51,10 @@ public function get_header( $name ) { public function get_headers() { + if(!$this->headers) { + return false; + } + return $this->headers; } diff --git a/src/WordPress/AsyncHttp/StreamWrapper.php b/src/WordPress/AsyncHttp/StreamWrapper.php index f240bd99..022164ad 100644 --- a/src/WordPress/AsyncHttp/StreamWrapper.php +++ b/src/WordPress/AsyncHttp/StreamWrapper.php @@ -42,6 +42,7 @@ public function stream_cast( $cast_as ) { public function stream_read( $count ) { $this->initialize(); + $this->client->event_loop_tick(); return $this->client->read_bytes( $this->wrapper_data->request, $count ); } @@ -52,57 +53,26 @@ public function stream_write( $data ) { } public function stream_tell() { - if ( ! $this->stream ) { - return false; - } + $this->initialize(); return parent::stream_tell(); } public function stream_close() { - if ( ! $this->stream ) { - return false; - } - - if ( ! $this->has_valid_stream() ) { - return false; - } + $this->initialize(); return parent::stream_close(); } public function stream_eof() { - if ( ! $this->stream ) { - return false; - } - - if ( ! $this->has_valid_stream() ) { - return true; - } + $this->initialize(); return parent::stream_eof(); } public function stream_seek( $offset, $whence ) { - if ( ! $this->stream ) { - return false; - } + $this->initialize(); return parent::stream_seek( $offset, $whence ); } - - /* - * This stream_close call could be initiated not by the developer, - * but by the PHP internal request shutdown handler (written in C). - * - * The underlying resource ($this->stream) may have already been closed - * and freed independently from the resource represented by $this stream - * wrapper. In this case, the type of $this->stream will be "Unknown", - * and the fclose() call will trigger a fatal error. - * - * Let's refuse to call fclose() in that scenario. - */ - protected function has_valid_stream() { - return get_resource_type( $this->stream ) !== 'Unknown'; - } }