Skip to content

Commit

Permalink
Add IPC docs
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 27, 2023
1 parent f013263 commit 7b53bad
Showing 1 changed file with 132 additions and 3 deletions.
135 changes: 132 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ To be as flexible as possible, this library comes with a collection of non-block
- PHP 8.1+

#### Optional requirements to use threads instead of processes
- PHP 8.2+ ZTS build
- PHP 8.2+ ZTS
- [`ext-parallel`](https://github.com/krakjoe/parallel)

## Installation
Expand Down Expand Up @@ -187,7 +187,7 @@ The return value of the child callable is available using the `Context::join()`
#### Child Process or Thread

```php
# child.php
// child.php

use Amp\Sync\Channel;

Expand All @@ -205,7 +205,7 @@ return function (Channel $channel): mixed {
#### Parent Process

```php
# parent.php
// parent.php

use Amp\Parallel\Context\ProcessContext;

Expand All @@ -228,6 +228,15 @@ Child processes or threads are also great for CPU-intensive operations such as i

An execution context can be created using the function `Amp\Parallel\Context\startContext()`, which uses the global `ContextFactory`. The global factory is an instance of `DefaultContextFactory` by default, but this instance can be overridden using the function `Amp\Parallel\Context\contextFactory()`.

```php
// Using the global context factory from Amp\Parallel\Context\contextFactory()
$context = Amp\Parallel\Context\startContext(__DIR__ . '/child.php');

// Creating a specific context factory and using it to create a context.
$contextFactory = new Amp\Parallel\Context\ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');
```

Context factories are used by worker pools to create the context which executes tasks. Providing a custom `ContextFactory` to a worker pool allows custom bootstrapping or other behavior within pool workers.

An execution context can be created by a `ContextFactory`. The worker pool uses context factories to create workers.
Expand All @@ -236,6 +245,126 @@ A global worker pool is available and can be set using the function `Amp\Paralle
Passing an instance of `WorkerPool` will set the global pool to the given instance.
Invoking the function without an instance will return the current global instance.

### IPC

A context is created with a single `Channel` which may be used to bidirectionally send data between the parent and child. Channels are a high-level data exchange, allowing serializable data to be sent over a channel. The `Channel` implementation handles serializing and unserializing data, message framing, and chunking over the underlying socket between the parent and child.

> **Note**
> Channels should be used to send only _data_ between the parent and child. Attempting to send resources such as database connections or file handles on a channel will not work. Such resources should be opened in each child process.
> One notable exception to this rule: server and client network sockets may be sent between parent and child using tools provided by [`amphp/cluster`](https://github.com/amphp/cluster).
The example code below defines a class, `AppMessage`, containing a message type enum and the associated message data which is dependent upon the enum case. All messages sent over the channel between the parent and child use an instance of `AppMessage` to define message intent. Alternatively, the child could use a different class for replies, but that was not done here for the sake of brevity. Any messaging strategy may be employed which is best suited your application, the only requirement is that any structure sent over a channel must be serializable.

The example below sends a message to the child to process an image after receiving a path from STDIN, then waits for the reply from the child. When an empty path is provided, the parent sends `null` to the child to break the child out of the message loop and waits for the child to exit before exiting itself.

```php
// AppMessage.php

class AppMessage {
public function __construct(
public readonly AppMessageType $type,
public readonly mixed $data,
) {
}
}
```

```php
// AppMessageType.php

enum AppMessageType {
case ProcessedImage;
case ProcessImageFromPath;
// Other enum cases for further message types...
}
```

```php
// parent.php

use Amp\Parallel\Context\ProcessContextFactory;

$contextFactory = new ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');

$stdin = Amp\ByteStream\getStdin();

while ($path = $stdin->read()) {
$message = new AppMessage(AppMessageType::ProcessImageFromPath, $path);
$context->send($message);

$reply = $context->receive(); // Wait for reply from child context with processed image data.
}

$context->send(null); // End loop in child process.
$context->join();
```

```php
// child.php

use Amp\Sync\Channel;

return function (Channel $channel): void {
/** @var AppMessage|null $message */
while ($message = $channel->receive()) {
$reply = match ($message->type) {
AppMessageType::ProcessImageFromPath => new AppMessage(
AppMessageType::ProcessedImage,
ImageProcessor::process($message->data),
),
// Handle other message types...
}

$channel->send($reply);
}
};
```

#### Creating an IPC socket

Sometimes it is necessary to create another socket for specialized IPC between a parent and child context. One such example is sending sockets between a parent and child process using `ClientSocketReceivePipe` and `ClientSocketSendPipe`, which are found in [`amphp/cluster`](https://github.com/amphp/cluster). An instance of `IpcHub` in the parent and the `Amp\Parallel\Ipc\connect()` function in the child.

The example below creates a separate IPC socket between a parent and child, then uses [`amphp/cluster`](https://github.com/amphp/cluster) to create instances of `ClientSocketReceivePipe` and `ClientSocketSendPipe` in the parent and child, respectively.

```php
// parent.php
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;

$ipcHub = new LocalIpcHub();

// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$contextFactory = new ProcessContextFactory(ipcHub: $ipcHub);

$context = $contextFactory->start(__DIR__ . '/child.php');

$connectionKey = $ipcHub->generateKey();
$context->send(['uri' => $ipcHub->getUri(), 'key' => $connectionKey]);

// $socket will be a bidirectional socket to the child.
$socket = $ipcHub->accept($connectionKey);

$socketPipe = new ClientSocketSendPipe($socket);
```

```php
// child.php
use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Sync\Channel;

return function (Channel $channel): void {
['uri' => $uri, 'key' => $connectionKey] = $channel->receive();

// $socket will be a bidirectional socket to the parent.
$socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);

$socketPipe = new ClientSocketReceivePipe($socket);
};
```

### Debugging

Step debugging may be used in child processes with PhpStorm and Xdebug by listening for debug connections in the IDE.
Expand Down

0 comments on commit 7b53bad

Please sign in to comment.