diff --git a/.github/workflows/phpstan.yml b/.github/workflows/phpstan.yml index e011290..076aed8 100644 --- a/.github/workflows/phpstan.yml +++ b/.github/workflows/phpstan.yml @@ -30,7 +30,7 @@ jobs: - name: Setup Dependencies env: - COMPOSER_ROOT_VERSION: 1.x-dev + COMPOSER_ROOT_VERSION: 2.x-dev run: composer install -o diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ebdf88e..e332736 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -80,7 +80,7 @@ jobs: - name: Setup Dependencies env: - COMPOSER_ROOT_VERSION: 1.x-dev + COMPOSER_ROOT_VERSION: 2.x-dev run: composer install -o - name: Coding Standards Check diff --git a/README-CN.md b/README-CN.md index 1f0ab9f..8b33af7 100644 --- a/README-CN.md +++ b/README-CN.md @@ -6,6 +6,8 @@ 支持 MQTT 协议 `3.1`、`3.1.1` 和 `5.0` 版本,支持`QoS 0`、`QoS 1`、`QoS 2`。 +支持 MQTT over WebSocket。 + > 首个支持 MQTT `5.0` 协议的 PHP library。 [![License](https://poser.pugx.org/simps/mqtt/license)](LICENSE) diff --git a/README.md b/README.md index cbac750..3421212 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ Support for MQTT protocol versions `3.1`, `3.1.1` and `5.0`. Support for `QoS 0`, `QoS 1`, `QoS 2`. +Support for MQTT over WebSocket. + > The first PHP library to support the MQTT `5.0` protocol. [![License](https://poser.pugx.org/simps/mqtt/license)](LICENSE) diff --git a/composer.json b/composer.json index 600f7e3..a6e52aa 100644 --- a/composer.json +++ b/composer.json @@ -55,7 +55,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.x-dev" + "dev-master": "2.x-dev" } } } diff --git a/docs/_coverpage.md b/docs/_coverpage.md index e64f1c8..11bbbf4 100644 --- a/docs/_coverpage.md +++ b/docs/_coverpage.md @@ -6,6 +6,7 @@ - Support for QoS 0, 1, 2 - Support for MQTT 3.1, 3.1.1, 5.0 +- Support for MQTT over WebSocket - The first PHP library to support the MQTT 5.0 protocol [Gitee](https://gitee.com/phpmqtt/mqtt) diff --git a/docs/en/README.md b/docs/en/README.md index 066834a..9c4825f 100644 --- a/docs/en/README.md +++ b/docs/en/README.md @@ -6,6 +6,8 @@ Support for MQTT protocol versions `3.1`, `3.1.1` and `5.0`. Support for `QoS 0`, `QoS 1`, `QoS 2`. +Support for MQTT over WebSocket. + ## Requirements * PHP >= `7.1` diff --git a/docs/en/_sidebar.md b/docs/en/_sidebar.md index 2accf74..168c37b 100644 --- a/docs/en/_sidebar.md +++ b/docs/en/_sidebar.md @@ -1,11 +1,15 @@ -* MQTT Coroutine Client - * [Client API](en/client) +- MQTT Coroutine Client + - [Client API](en/client) + - [WebSocket Client API](en/websocket) -* MQTT Protocol Analysis - * [Protocol API](en/protocol) +- MQTT Protocol Analysis + - [Protocol API](en/protocol) -* MQTT Message - * [Message API](en/message) +- MQTT Message + - [Message API](en/message) -* Upgrade Guide - * [1.2 Upgrade Guide](en/upgrade/1.2.md) \ No newline at end of file +- Upgrade Guide + - [1.2 Upgrade Guide](en/upgrade/1.2) + +- Tools + - [Debug Tools](en/debug) diff --git a/docs/en/client.md b/docs/en/client.md index 8a1308f..b776cf8 100644 --- a/docs/en/client.md +++ b/docs/en/client.md @@ -8,15 +8,15 @@ Create a MQTT client instance Simps\MQTT\Client::__construct(string $host, int $port, ClientConfig $config, int $clientType = Client::COROUTINE_CLIENT_TYPE) ``` -* `string $host` +- `string $host` Broker's host -* `int $port` +- `int $port` Broker's port -* `ClientConfig $config` +- `ClientConfig $config` ClientConfig object. @@ -39,9 +39,9 @@ $configObj = new Simps\MQTT\Config\ClientConfig($config); $client = new Simps\MQTT\Client('127.0.0.1', 1883, $configObj); ``` -!> The Client will use the corresponding protocol resolution according to the `protocol_level` set. +!> The Client will use the corresponding protocol resolution according to the `protocolLevel` set. -* `int $clientType` +- `int $clientType` Set the client type, use a Coroutine Client or a Sync Client, the default is Coroutine Client. @@ -55,13 +55,13 @@ Connect Broker Simps\MQTT\Client->connect(bool $clean = true, array $will = []) ``` -* `bool $clean` +- `bool $clean` Clean session. default is `true`. For a detailed description, please see the corresponding protocol document: `Clean Session`. -* `array $will` +- `array $will` When a client is disconnected, Broker will automatically send a will message to other clients @@ -91,13 +91,13 @@ Subscribe to one topic or multiple topics Simps\MQTT\Client->subscribe(array $topics, array $properties = []) ``` -* `array $topics` +- `array $topics` ```php // MQTT 3.x $topics = [ // topic => Qos - 'topic1' => 0, + 'topic1' => 0, 'topic2' => 1, ]; @@ -109,17 +109,17 @@ $topics = [ 'no_local' => true, 'retain_as_published' => true, 'retain_handling' => 2, - ], + ], 'topic2' => [ 'qos' => 2, 'no_local' => false, 'retain_as_published' => true, 'retain_handling' => 1, - ], + ], ]; ``` -* `array $properties` +- `array $properties` Optional in MQTT5 @@ -131,13 +131,13 @@ Unsubscribe from a topic or multiple topics Simps\MQTT\Client->unSubscribe(array $topics, array $properties = []) ``` -* `array $topics` +- `array $topics` ```php $topics = ['topic1', 'topic2']; ``` -* `array $properties` +- `array $properties` Optional in MQTT5 @@ -157,14 +157,6 @@ New AUTH type added in MQTT5. Authentication exchange. Simps\MQTT\Client->auth(int $code = ReasonCode::SUCCESS, array $properties = []) ``` -## recv() - -Receive messages - -```php -Simps\MQTT\Client->recv(): bool|arary|string -``` - ## send() Send messages @@ -173,14 +165,22 @@ Send messages Simps\MQTT\Client->send(array $data, $response = true) ``` -* `array $data` +- `array $data` `$data` is the data to be sent and must contain information such as `type` -* `bool $response` +- `bool $response` Are acknowledgements required. If `true`, `recv()` is called once +## recv() + +Receive messages + +```php +Simps\MQTT\Client->recv(): bool|arary|string +``` + ## ping() Send a heartbeat diff --git a/docs/en/debug.md b/docs/en/debug.md new file mode 100644 index 0000000..797edb1 --- /dev/null +++ b/docs/en/debug.md @@ -0,0 +1,52 @@ +# Debug Tools + +The tool provides 5 methods for debugging binary data, essentially functioning as a binary data viewer. + +It primarily converts binary data into ASCII or hexadecimal formats for viewing, useful for debugging TCP, WebSocket, UDP, and other protocols. + +```php +public function hexDump(): string // Display in hexadecimal +public function hexDumpAscii(): string // Display in both hexadecimal and corresponding ASCII characters +public function printableText(): string // Printable characters +public function hexStream(): string // Hexadecimal stream +public function ascii(): string // Display in ASCII characters +``` + +You can call these methods statically or instantiate `Simps\MQTT\Tools\Debug` or `Simps\MQTT\Tools\Common`/`Simps\MQTT\Tools\UnPackTool`: + +- Instantiation + +```php +use Simps\MQTT\Tools\Debug; + +$debug = new Debug('0:simps-mqtt/user001/update{ + "msg": "hello, mqtt" +}'); + +//$debug = (new Debug())->setEncode('0:simps-mqtt/user001/update{ +// "msg": "hello, mqtt" +//}'); + +echo $debug->hexDump(), PHP_EOL; +echo $debug->hexDumpAscii(), PHP_EOL; +echo $debug->printableText(), PHP_EOL; +echo $debug->hexStream(), PHP_EOL; +echo $debug->ascii(); +``` + +- Static call + +```php +use Simps\MQTT\Tools\UnPackTool; + +echo UnPackTool::hexDumpAscii('0:simps-mqtt/user001/update{ + "msg": "hello, mqtt" +}'); +``` + +```text +00000000 30 3a 73 69 6d 70 73 2d 6d 71 74 74 2f 75 73 65 0:simps-mqtt/use +00000010 72 30 30 31 2f 75 70 64 61 74 65 7b 0a 20 20 22 r001/update{. " +00000020 6d 73 67 22 3a 20 22 68 65 6c 6c 6f 2c 20 6d 71 msg": "hello, mq +00000030 74 74 22 0a 7d tt".} +``` diff --git a/docs/en/websocket.md b/docs/en/websocket.md new file mode 100644 index 0000000..40846de --- /dev/null +++ b/docs/en/websocket.md @@ -0,0 +1,214 @@ +# Client API + +## __construct() + +Create an instance of the MQTT over WebSocket client + +```php +Simps\MQTT\WebSocketClient::__construct(string $host, int $port, ClientConfig $config, string $path = '/mqtt', bool $ssl = false) +``` + +- `string $host` + +Broker's host + +- `int $port` + +Broker's port + +- `ClientConfig $config` + +ClientConfig object. + +- `string $path` + +WebSocket path, default is `/mqtt` + +- `bool $ssl` + +Whether to use SSL, default is `false` + +Example. + +```php +$config = [ + 'userName' => '', + 'password' => '', + 'clientId' => '', + 'keepAlive' => 10, + 'protocolName' => 'MQTT', // or MQIsdp + 'protocolLevel' => 4, // or 3, 5 + 'properties' => [], // optional in MQTT5 + 'delay' => 3000, // 3s + 'maxAttempts' => 5, + 'swooleConfig' => [] +]; +$configObj = new Simps\MQTT\Config\ClientConfig($config); +$client = new Simps\MQTT\WebSocketClient('broker.emqx.io', 8083, $configObj, '/mqtt'); +``` + +## connect() + +Connect Broker + +```php +Simps\MQTT\WebSocketClient->connect(bool $clean = true, array $will = []) +``` + +- `bool $clean` + +Clean session. default is `true`. + +For a detailed description, please see the corresponding protocol document: `Clean Session`. + +- `array $will` + +When a client is disconnected, Broker will automatically send a will message to other clients + +```php +$will = [ + 'topic' => '', + 'qos' => 1, + 'retain' => 0, + 'message' => '', // message content + 'properties' => [], // optional in MQTT5 +]; +``` + +## publish() + +push a message to a topic + +```php +Simps\MQTT\WebSocketClient->publish($topic, $message, $qos = 0, $dup = 0, $retain = 0, array $properties = []) +``` + +## subscribe() + +Subscribe to one topic or multiple topics + +```php +Simps\MQTT\WebSocketClient->subscribe(array $topics, array $properties = []) +``` + +- `array $topics` + +```php +// MQTT 3.x +$topics = [ + // topic => Qos + 'topic1' => 0, + 'topic2' => 1, +]; + +// MQTT 5.0 +$topics = [ + // topic => options + 'topic1' => [ + 'qos' => 1, + 'no_local' => true, + 'retain_as_published' => true, + 'retain_handling' => 2, + ], + 'topic2' => [ + 'qos' => 2, + 'no_local' => false, + 'retain_as_published' => true, + 'retain_handling' => 1, + ], +]; +``` + +- `array $properties` + +Optional in MQTT5 + +## unSubscribe() + +Unsubscribe from a topic or multiple topics + +```php +Simps\MQTT\WebSocketClient->unSubscribe(array $topics, array $properties = []) +``` + +- `array $topics` + +```php +$topics = ['topic1', 'topic2']; +``` + +- `array $properties` + +Optional in MQTT5 + +## close() + +Disconnect from Broker connect. The `DISCONNECT(14)` message is send to Broker + +```php +Simps\MQTT\WebSocketClient->close(int $code = ReasonCode::NORMAL_DISCONNECTION, array $properties = []) +``` + +## auth() + +New AUTH type added in MQTT5. Authentication exchange. + +```php +Simps\MQTT\WebSocketClient->auth(int $code = ReasonCode::SUCCESS, array $properties = []) +``` + +## send() + +Send messages + +```php +Simps\MQTT\WebSocketClient->send(array $data, $response = true) +``` + +- `array $data` + +`$data` is the data to be sent and must contain information such as `type` + +- `bool $response` + +Are acknowledgements required. If `true`, `recv()` is called once + +## recv() + +Receive messages + +```php +Simps\MQTT\WebSocketClient->recv(): bool|arary|string +``` + +## ping() + +Send a heartbeat + +```php +Simps\MQTT\WebSocketClient->ping() +``` + +## buildMessageId() + +Generate MessageId + +```php +Simps\MQTT\WebSocketClient->buildMessageId() +``` + +## genClientId() + +Generate ClientId + +```php +Simps\MQTT\WebSocketClient::genClientID(string $prefix = 'Simps_') +``` + +## getClient() + +Get an instance of `Swoole\Coroutine\Http\Client` + +```php +Simps\MQTT\WebSocketClient->getClient() +``` diff --git a/docs/zh-cn/README.md b/docs/zh-cn/README.md index bc907ce..1a7051a 100644 --- a/docs/zh-cn/README.md +++ b/docs/zh-cn/README.md @@ -4,6 +4,8 @@ 支持 MQTT 协议 `3.1`、`3.1.1` 和 `5.0` 版本,支持`QoS 0`、`QoS 1`、`QoS 2`。 +支持 MQTT over WebSocket。 + ## 依赖要求 * PHP >= `7.1` diff --git a/docs/zh-cn/_sidebar.md b/docs/zh-cn/_sidebar.md index ae7e637..87b2a42 100644 --- a/docs/zh-cn/_sidebar.md +++ b/docs/zh-cn/_sidebar.md @@ -1,16 +1,16 @@ -* MQTT 协程客户端 - * [Client API](zh-cn/client) +- MQTT 协程客户端 + - [Client API](zh-cn/client) + - [WebSocket Client API](zh-cn/websocket) -* MQTT 协议解析 - * [Protocol API](zh-cn/protocol) +- MQTT 协议解析 + - [Protocol API](zh-cn/protocol) -* MQTT Message - * [Message API](zh-cn/message) +- MQTT Message + - [Message API](zh-cn/message) -* 版本升级指南 - * [1.2 升级指南](zh-cn/upgrade/1.2.md) +- 版本升级指南 + - [1.2 升级指南](zh-cn/upgrade/1.2) -* Tools - - - [Debug Tools](zh-cn/debug.md) - - [自适应协议等级](zh-cn/level.md) +- Tools + - [Debug Tools](zh-cn/debug) + - [自适应协议等级](zh-cn/level) diff --git a/docs/zh-cn/client.md b/docs/zh-cn/client.md index f3d3754..76c015a 100644 --- a/docs/zh-cn/client.md +++ b/docs/zh-cn/client.md @@ -2,21 +2,21 @@ ## __construct() -创建一个MQTT客户端实例 +创建一个 MQTT 客户端实例 ```php Simps\MQTT\Client::__construct(string $host, int $port, ClientConfig $config, int $clientType = Client::COROUTINE_CLIENT_TYPE) ``` -* 参数`string $host` +- 参数`string $host` Broker 的 IP 地址 -* 参数`int $port` +- 参数`int $port` Broker 的端口 -* 参数`ClientConfig $config` +- 参数`ClientConfig $config` 客户端配置对象。 @@ -39,9 +39,9 @@ $configObj = new Simps\MQTT\Config\ClientConfig($config); $client = new Simps\MQTT\Client('127.0.0.1', 1883, $configObj); ``` -!> Client 会根据设置的`protocol_level`来使用对应的协议解析 +!> Client 会根据设置的`protocolLevel`来使用对应的协议解析 -* 参数`int $clientType` +- 参数`int $clientType` 设置客户端类型,使用协程 Client 还是同步阻塞 Client。默认为协程 Client。 @@ -49,21 +49,21 @@ $client = new Simps\MQTT\Client('127.0.0.1', 1883, $configObj); ## connect() -连接Broker +连接 Broker ```php Simps\MQTT\Client->connect(bool $clean = true, array $will = []) ``` -* 参数`bool $clean` +- 参数`bool $clean` 清理会话,默认为`true` 具体描述请查看对应协议文档:`清理会话 Clean Session` -* 参数`array $will` +- 参数`array $will` -遗嘱消息,当客户端断线后Broker会自动发送遗嘱消息给其它客户端 +遗嘱消息,当客户端断线后 Broker 会自动发送遗嘱消息给其它客户端 需要设置的内容如下 @@ -85,12 +85,12 @@ $will = [ Simps\MQTT\Client->publish($topic, $message, $qos = 0, $dup = 0, $retain = 0, array $properties = []) ``` -* 参数`$topic` 主题 -* 参数`$message` 内容 -* 参数`$qos` QoS等级,默认0 -* 参数`$dup` 重发标志,默认0 -* 参数`$retain` retain标记,默认0 -* 参数`$properties` 属性,MQTT5 中需要,可选 +- 参数`$topic` 主题 +- 参数`$message` 内容 +- 参数`$qos` QoS 等级,默认 0 +- 参数`$dup` 重发标志,默认 0 +- 参数`$retain` retain 标记,默认 0 +- 参数`$properties` 属性,MQTT5 中需要,可选 ## subscribe() @@ -100,7 +100,7 @@ Simps\MQTT\Client->publish($topic, $message, $qos = 0, $dup = 0, $retain = 0, ar Simps\MQTT\Client->subscribe(array $topics, array $properties = []) ``` -* 参数`array $topics` +- 参数`array $topics` `$topics`的`key`是主题,值为`QoS`的数组,例如 @@ -108,7 +108,7 @@ Simps\MQTT\Client->subscribe(array $topics, array $properties = []) // MQTT 3.x $topics = [ // 主题 => Qos - 'topic1' => 0, + 'topic1' => 0, 'topic2' => 1, ]; @@ -120,17 +120,17 @@ $topics = [ 'no_local' => true, 'retain_as_published' => true, 'retain_handling' => 2, - ], + ], 'topic2' => [ 'qos' => 2, 'no_local' => false, 'retain_as_published' => true, 'retain_handling' => 1, - ], + ], ]; ``` -* 参数`array $properties` +- 参数`array $properties` 属性,MQTT5 中需要,可选 @@ -142,31 +142,31 @@ $topics = [ Simps\MQTT\Client->unSubscribe(array $topics, array $properties = []) ``` -* 参数`array $topics` +- 参数`array $topics` ```php $topics = ['topic1', 'topic2']; ``` -* 参数`array $properties` +- 参数`array $properties` 属性,MQTT5 中需要,可选 ## close() -正常断开与Broker的连接,`DISCONNECT(14)`报文会被发送到Broker +正常断开与 Broker 的连接,`DISCONNECT(14)`报文会被发送到 Broker ```php Simps\MQTT\Client->close(int $code = ReasonCode::NORMAL_DISCONNECTION, array $properties = []) ``` -* 参数`int $code` +- 参数`int $code` -响应码,MQTT5 中需要,MQTT3直接调用即可 +响应码,MQTT5 中需要,MQTT3 直接调用即可 -* 参数`array $properties` +- 参数`array $properties` -属性,MQTT5中需要 +属性,MQTT5 中需要 ## auth() @@ -176,14 +176,6 @@ MQTT5 中新增的认证交换机制。 Simps\MQTT\Client->auth(int $code = ReasonCode::SUCCESS, array $properties = []) ``` -## recv() - -接收消息 - -```php -Simps\MQTT\Client->recv(): bool|arary|string -``` - ## send() 发送消息 @@ -192,14 +184,22 @@ Simps\MQTT\Client->recv(): bool|arary|string Simps\MQTT\Client->send(array $data, $response = true) ``` -* 参数`array $data` +- 参数`array $data` `$data`是需要发送的数据,必须包含`type`等信息 -* 参数`bool $response` +- 参数`bool $response` 是否需要回执。如果为`true`,会调用一次`recv()` +## recv() + +接收消息 + +```php +Simps\MQTT\Client->recv(): bool|arary|string +``` + ## ping() 发送心跳包 @@ -210,7 +210,7 @@ Simps\MQTT\Client->ping() ## buildMessageId() -生成MessageId +生成 MessageId ```php Simps\MQTT\Client->buildMessageId() @@ -218,7 +218,7 @@ Simps\MQTT\Client->buildMessageId() ## genClientId() -生成ClientId +生成 ClientId ```php Simps\MQTT\Client::genClientID(string $prefix = 'Simps_') diff --git a/docs/zh-cn/websocket.md b/docs/zh-cn/websocket.md new file mode 100644 index 0000000..36c0bd1 --- /dev/null +++ b/docs/zh-cn/websocket.md @@ -0,0 +1,233 @@ +# WebSocket Client API + +## __construct() + +创建一个 MQTT over WebSocket 客户端实例 + +```php +Simps\MQTT\WebSocketClient::__construct(string $host, int $port, ClientConfig $config, string $path = '/mqtt', bool $ssl = false) +``` + +- 参数`string $host` + +Broker 的 IP 地址 + +- 参数`int $port` + +Broker 的端口 + +- 参数`ClientConfig $config` + +客户端配置对象 + +- 参数`string $path` + +WebSocket 的路径,默认为`/mqtt` + +- 参数`bool $ssl` + +是否使用 SSL,默认为`false` + +示例: + +```php +$config = [ + 'userName' => '', // 用户名 + 'password' => '', // 密码 + 'clientId' => '', // 客户端id + 'keepAlive' => 10, // 默认0秒,设置成0代表禁用 + 'protocolName' => 'MQTT', // 协议名,默认为MQTT(3.1.1版本),也可为MQIsdp(3.1版本) + 'protocolLevel' => 4, // 协议等级,MQTT3.1.1版本为4,5.0版本为5,MQIsdp为3 + 'properties' => [], // MQTT5 中所需要的属性 + 'delay' => 3000, // 重连时的延迟时间 (毫秒) + 'maxAttempts' => 5, // 最大重连次数。默认-1,表示不限制 + 'swooleConfig' => [] +]; +$configObj = new Simps\MQTT\Config\ClientConfig($config); +$client = new Simps\MQTT\WebSocketClient('broker.emqx.io', 8083, $configObj, '/mqtt'); +``` + +## connect() + +连接 Broker + +```php +Simps\MQTT\WebSocketClient->connect(bool $clean = true, array $will = []) +``` + +- 参数`bool $clean` + +清理会话,默认为`true` + +具体描述请查看对应协议文档:`清理会话 Clean Session` + +- 参数`array $will` + +遗嘱消息,当客户端断线后 Broker 会自动发送遗嘱消息给其它客户端 + +需要设置的内容如下 + +```php +$will = [ + 'topic' => '', // 主题 + 'qos' => 1, // QoS等级 + 'retain' => 0, // retain标记 + 'message' => '', // 遗嘱消息内容 + 'properties' => [], // MQTT5 中需要,可选 +]; +``` + +## publish() + +向某个主题发布一条消息 + +```php +Simps\MQTT\WebSocketClient->publish($topic, $message, $qos = 0, $dup = 0, $retain = 0, array $properties = []) +``` + +- 参数`$topic` 主题 +- 参数`$message` 内容 +- 参数`$qos` QoS 等级,默认 0 +- 参数`$dup` 重发标志,默认 0 +- 参数`$retain` retain 标记,默认 0 +- 参数`$properties` 属性,MQTT5 中需要,可选 + +## subscribe() + +订阅一个主题或者多个主题 + +```php +Simps\MQTT\WebSocketClient->subscribe(array $topics, array $properties = []) +``` + +- 参数`array $topics` + +`$topics`的`key`是主题,值为`QoS`的数组,例如 + +```php +// MQTT 3.x +$topics = [ + // 主题 => Qos + 'topic1' => 0, + 'topic2' => 1, +]; + +// MQTT 5.0 +$topics = [ + // 主题 => 选项 + 'topic1' => [ + 'qos' => 1, + 'no_local' => true, + 'retain_as_published' => true, + 'retain_handling' => 2, + ], + 'topic2' => [ + 'qos' => 2, + 'no_local' => false, + 'retain_as_published' => true, + 'retain_handling' => 1, + ], +]; +``` + +- 参数`array $properties` + +属性,MQTT5 中需要,可选 + +## unSubscribe() + +取消订阅一个主题或者多个主题 + +```php +Simps\MQTT\WebSocketClient->unSubscribe(array $topics, array $properties = []) +``` + +- 参数`array $topics` + +```php +$topics = ['topic1', 'topic2']; +``` + +- 参数`array $properties` + +属性,MQTT5 中需要,可选 + +## close() + +正常断开与 Broker 的连接,`DISCONNECT(14)`报文会被发送到 Broker + +```php +Simps\MQTT\WebSocketClient->close(int $code = ReasonCode::NORMAL_DISCONNECTION, array $properties = []) +``` + +- 参数`int $code` + +响应码,MQTT5 中需要,MQTT3 直接调用即可 + +- 参数`array $properties` + +属性,MQTT5 中需要 + +## auth() + +MQTT5 中新增的认证交换机制。 + +```php +Simps\MQTT\WebSocketClient->auth(int $code = ReasonCode::SUCCESS, array $properties = []) +``` + +## send() + +发送消息 + +```php +Simps\MQTT\WebSocketClient->send(array $data, $response = true) +``` + +- 参数`array $data` + +`$data`是需要发送的数据,必须包含`type`等信息 + +- 参数`bool $response` + +是否需要回执。如果为`true`,会调用一次`recv()` + +## recv() + +接收消息 + +```php +Simps\MQTT\WebSocketClient->recv(): bool|arary|string +``` + +## ping() + +发送心跳包 + +```php +Simps\MQTT\WebSocketClient->ping() +``` + +## buildMessageId() + +生成 MessageId + +```php +Simps\MQTT\WebSocketClient->buildMessageId() +``` + +## genClientId() + +生成 ClientId + +```php +Simps\MQTT\WebSocketClient::genClientID(string $prefix = 'Simps_') +``` + +## getClient() + +获取 `Swoole\Coroutine\Http\Client` 的实例 + +```php +Simps\MQTT\WebSocketClient->getClient() +``` diff --git a/examples/bootstrap.php b/examples/bootstrap.php index 8434eb3..200ca44 100644 --- a/examples/bootstrap.php +++ b/examples/bootstrap.php @@ -39,6 +39,7 @@ const SIMPS_MQTT_LOCAL_HOST = '127.0.0.1'; const SIMPS_MQTT_REMOTE_HOST = 'broker.emqx.io'; const SIMPS_MQTT_PORT = 1883; +const SIMPS_MQTT_OVER_WEBSOCKET_PORT = 8083; function getTestConnectConfig() { diff --git a/examples/websocket.php b/examples/websocket.php new file mode 100644 index 0000000..6ca2040 --- /dev/null +++ b/examples/websocket.php @@ -0,0 +1,62 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code. + */ + +include_once __DIR__ . '/bootstrap.php'; + +use Simps\MQTT\WebSocketClient; +use Simps\MQTT\Protocol\Types; +use Swoole\Coroutine; + +Coroutine\run(function () { + $client = new WebSocketClient(SIMPS_MQTT_REMOTE_HOST, SIMPS_MQTT_OVER_WEBSOCKET_PORT, getTestConnectConfig()); + $will = [ + 'topic' => 'simps-mqtt/users/byebye', + 'qos' => 0, + 'retain' => 0, + 'message' => 'byebye', + ]; + $client->connect(true, $will); + $topics['simps-mqtt/users/#'] = 0; + $client->subscribe($topics); + $timeSincePing = time(); + while (true) { + try { + $buffer = $client->recv(); + if ($buffer && $buffer !== true) { + var_dump($buffer); + // QoS1 PUBACK + if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) { + $client->send( + [ + 'type' => Types::PUBACK, + 'message_id' => $buffer['message_id'], + ], + false + ); + } + if ($buffer['type'] === Types::DISCONNECT) { + echo "Broker is disconnected\n"; + $client->close(); + break; + } + } + if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) { + $buffer = $client->ping(); + if ($buffer) { + echo 'send ping success' . PHP_EOL; + $timeSincePing = time(); + } + } + } catch (\Throwable $e) { + throw $e; + } + } +}); diff --git a/phpstan.neon b/phpstan.neon index 8f5baa3..2a891f6 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -17,6 +17,8 @@ parameters: paths: - tests/* - src/Client.php + - src/BaseClient.php + - src/WebSocketClient.php - message: '#Property \S+ has no type specified.#' paths: diff --git a/src/BaseClient.php b/src/BaseClient.php new file mode 100644 index 0000000..1f7f744 --- /dev/null +++ b/src/BaseClient.php @@ -0,0 +1,351 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code. + */ +namespace Simps\MQTT; + +use Simps\MQTT\Config\ClientConfig; +use Simps\MQTT\Exception\ConnectException; +use Simps\MQTT\Exception\ProtocolException; +use Simps\MQTT\Hex\ReasonCode; +use Simps\MQTT\Tools\Common; +use Swoole\Coroutine; +use Swoole\Coroutine\Http\Client as WebSocketClient; + +abstract class BaseClient +{ + public const COROUTINE_CLIENT_TYPE = 1; + + public const SYNC_CLIENT_TYPE = 2; + + public const WEBSOCKET_CLIENT_TYPE = 3; + + /** @var Coroutine\Client|\Swoole\Client|WebSocketClient */ + private $client; + + /** @var int */ + private $messageId = 0; + + /** @var array */ + private $connectData = []; + + /** @var string */ + private $host; + + /** @var int */ + private $port; + + /** @var ClientConfig */ + private $config; + + /** @var int */ + private $clientType; + + /** @var string */ + private $path = '/mqtt'; + + /** @var bool */ + private $ssl = false; + + /** + * @return $this + */ + public function setHost(string $host): self + { + $this->host = $host; + + return $this; + } + + public function getHost(): string + { + return $this->host; + } + + public function setPort(int $port): self + { + $this->port = $port; + + return $this; + } + + public function getPort(): int + { + return $this->port; + } + + /** + * @return $this + */ + public function setClientType(int $clientType): self + { + $this->clientType = $clientType; + + return $this; + } + + public function getClientType(): int + { + return $this->clientType; + } + + /** + * @return $this + */ + public function setConfig(ClientConfig $config): self + { + $this->config = $config; + + return $this; + } + + public function getConfig(): ClientConfig + { + return $this->config; + } + + /** + * @param Coroutine\Client|\Swoole\Client|WebSocketClient $client + * @return $this + */ + public function setClient($client): self + { + $this->client = $client; + + return $this; + } + + /** + * @return Coroutine\Client|\Swoole\Client|WebSocketClient + */ + public function getClient() + { + return $this->client; + } + + public function setPath(string $path): self + { + $this->path = $path; + + return $this; + } + + public function getPath(): string + { + return $this->path; + } + + public function setSsl(bool $ssl): self + { + $this->ssl = $ssl; + + return $this; + } + + public function getSsl(): bool + { + return $this->ssl; + } + + public function setConnectData(array $connectData): self + { + $this->connectData = $connectData; + + return $this; + } + + /** + * @return null|array|string + */ + public function getConnectData(?string $key = null) + { + if ($key) { + if (isset($this->connectData[$key])) { + return $this->connectData[$key]; + } + + return null; + } + + return $this->connectData; + } + + protected function isCoroutineClientType(): bool + { + return $this->clientType === self::COROUTINE_CLIENT_TYPE; + } + + protected function isWebSocketClientType(): bool + { + return $this->clientType === self::WEBSOCKET_CLIENT_TYPE; + } + + public function sleep(int $ms): void + { + $this->isCoroutineClientType() ? Coroutine::sleep($ms / 1000) : usleep($ms * 1000); + } + + public function buildMessageId(): int + { + return ++$this->messageId > 65535 ? $this->messageId = 1 : $this->messageId; + } + + public static function genClientID(string $prefix = 'Simps_'): string + { + return uniqid($prefix); + } + + protected function handleVerbose(string $data): void + { + switch ($this->getConfig()->getVerbose()) { + case MQTT_VERBOSE_HEXDUMP: + echo Common::hexDump($data), PHP_EOL; + break; + case MQTT_VERBOSE_HEXDUMP_ASCII: + echo Common::hexDumpAscii($data), PHP_EOL; + break; + case MQTT_VERBOSE_ASCII: + echo Common::ascii($data), PHP_EOL; + break; + case MQTT_VERBOSE_TEXT: + echo Common::printableText($data), PHP_EOL; + break; + case MQTT_VERBOSE_HEX_STREAM: + echo Common::hexStream($data), PHP_EOL; + break; + case MQTT_VERBOSE_NONE: + default: + break; + } + } + + protected function handleException(): void + { + if ($this->isCoroutineClientType() || $this->isWebSocketClientType()) { + $errMsg = $this->client->errMsg; + } else { + $errMsg = socket_strerror($this->client->errCode); + } + $this->client->close(); + throw new ConnectException($errMsg, $this->client->errCode); + } + + public function connect(bool $clean = true, array $will = []) + { + $data = [ + 'type' => Protocol\Types::CONNECT, + 'protocol_name' => $this->getConfig()->getProtocolName(), + 'protocol_level' => $this->getConfig()->getProtocolLevel(), + 'clean_session' => $clean, + 'client_id' => $this->getConfig()->getClientId(), + 'keep_alive' => $this->getConfig()->getKeepAlive(), + 'properties' => $this->getConfig()->getProperties(), + 'user_name' => $this->getConfig()->getUserName(), + 'password' => $this->getConfig()->getPassword(), + ]; + if (!empty($will)) { + if (empty($will['topic'])) { + throw new ProtocolException('Topic cannot be empty'); + } + $data['will'] = $will; + } + + $this->setConnectData($data); + + return $this->send($data); + } + + public function subscribe(array $topics, array $properties = []) + { + return $this->send([ + 'type' => Protocol\Types::SUBSCRIBE, + 'message_id' => $this->buildMessageId(), + 'properties' => $properties, + 'topics' => $topics, + ]); + } + + public function unSubscribe(array $topics, array $properties = []) + { + return $this->send([ + 'type' => Protocol\Types::UNSUBSCRIBE, + 'message_id' => $this->buildMessageId(), + 'properties' => $properties, + 'topics' => $topics, + ]); + } + + public function publish( + string $topic, + string $message, + int $qos = 0, + int $dup = 0, + int $retain = 0, + array $properties = [] + ) { + if (empty($topic)) { + if ($this->getConfig()->isMQTT5()) { + if (empty($properties['topic_alias'])) { + throw new ProtocolException('Topic cannot be empty or need to set topic_alias'); + } + } else { + throw new ProtocolException('Topic cannot be empty'); + } + } + + $response = $qos > 0; + + // A PUBLISH packet MUST NOT contain a Packet Identifier if its QoS value is set to 0 + $message_id = 0; + if ($qos) { + $message_id = $this->buildMessageId(); + } + + return $this->send( + [ + 'type' => Protocol\Types::PUBLISH, + 'qos' => $qos, + 'dup' => $dup, + 'retain' => $retain, + 'topic' => $topic, + 'message_id' => $message_id, + 'properties' => $properties, + 'message' => $message, + ], + $response + ); + } + + public function ping() + { + return $this->send(['type' => Protocol\Types::PINGREQ]); + } + + public function close(int $code = ReasonCode::NORMAL_DISCONNECTION, array $properties = []): bool + { + $this->send(['type' => Protocol\Types::DISCONNECT, 'code' => $code, 'properties' => $properties], false); + + return $this->client->close(); + } + + public function auth(int $code = ReasonCode::SUCCESS, array $properties = []) + { + return $this->send(['type' => Protocol\Types::AUTH, 'code' => $code, 'properties' => $properties]); + } + + abstract protected function reConnect(): void; + + abstract public function send(array $data, bool $response = true); + + abstract public function recv(); + + abstract protected function getResponse(); +} diff --git a/src/Client.php b/src/Client.php index a6f112a..cdbea82 100644 --- a/src/Client.php +++ b/src/Client.php @@ -13,169 +13,34 @@ namespace Simps\MQTT; use Simps\MQTT\Config\ClientConfig; -use Simps\MQTT\Exception\ConnectException; -use Simps\MQTT\Exception\ProtocolException; -use Simps\MQTT\Hex\ReasonCode; -use Simps\MQTT\Tools\Common; use Swoole\Coroutine; -class Client +class Client extends BaseClient { - /** @var Coroutine\Client|\Swoole\Client */ - private $client; - - /** @var int */ - private $messageId = 0; - - /** @var array */ - private $connectData = []; - - /** @var string */ - private $host; - - /** @var int */ - private $port; - - /** @var ClientConfig */ - private $config; - - /** @var int */ - private $clientType; - - public const COROUTINE_CLIENT_TYPE = 1; - - public const SYNC_CLIENT_TYPE = 2; - public function __construct( string $host, int $port, ClientConfig $config, int $clientType = self::COROUTINE_CLIENT_TYPE ) { - $this->host = $host; - $this->port = $port; - $this->config = $config; - $this->clientType = $clientType; + $this->setHost($host) + ->setPort($port) + ->setConfig($config) + ->setClientType($clientType); if ($this->isCoroutineClientType()) { - $this->client = new Coroutine\Client($config->getSockType()); + $client = new Coroutine\Client($config->getSockType()); } else { - $this->client = new \Swoole\Client($config->getSockType()); - } - $this->client->set($config->getSwooleConfig()); - if (!$this->client->connect($host, $port)) { - $this->reConnect(); - } - } - - public function connect(bool $clean = true, array $will = []) - { - $data = [ - 'type' => Protocol\Types::CONNECT, - 'protocol_name' => $this->getConfig()->getProtocolName(), - 'protocol_level' => $this->getConfig()->getProtocolLevel(), - 'clean_session' => $clean, - 'client_id' => $this->getConfig()->getClientId(), - 'keep_alive' => $this->getConfig()->getKeepAlive(), - 'properties' => $this->getConfig()->getProperties(), - 'user_name' => $this->getConfig()->getUserName(), - 'password' => $this->getConfig()->getPassword(), - ]; - if (!empty($will)) { - if (!isset($will['topic']) || empty($will['topic'])) { - throw new ProtocolException('Topic cannot be empty'); - } - $data['will'] = $will; - } - - $this->connectData = $data; - - return $this->send($data); - } - - public function subscribe(array $topics, array $properties = []) - { - $data = [ - 'type' => Protocol\Types::SUBSCRIBE, - 'message_id' => $this->buildMessageId(), - 'properties' => $properties, - 'topics' => $topics, - ]; - - return $this->send($data); - } - - public function unSubscribe(array $topics, array $properties = []) - { - $data = [ - 'type' => Protocol\Types::UNSUBSCRIBE, - 'message_id' => $this->buildMessageId(), - 'properties' => $properties, - 'topics' => $topics, - ]; - - return $this->send($data); - } - - public function publish( - string $topic, - string $message, - int $qos = 0, - int $dup = 0, - int $retain = 0, - array $properties = [] - ) { - if (empty($topic)) { - if ($this->getConfig()->isMQTT5()) { - if (!isset($properties['topic_alias']) || empty($properties['topic_alias'])) { - throw new ProtocolException('Topic cannot be empty or need to set topic_alias'); - } - } else { - throw new ProtocolException('Topic cannot be empty'); - } + $client = new \Swoole\Client($config->getSockType()); } - - $response = $qos > 0; - - // A PUBLISH packet MUST NOT contain a Packet Identifier if its QoS value is set to 0 - $message_id = 0; - if ($qos) { - $message_id = $this->buildMessageId(); + $client->set($config->getSwooleConfig()); + $this->setClient($client); + if (!$this->getClient()->connect($host, $port)) { + $this->handleException(); } - - return $this->send( - [ - 'type' => Protocol\Types::PUBLISH, - 'qos' => $qos, - 'dup' => $dup, - 'retain' => $retain, - 'topic' => $topic, - 'message_id' => $message_id, - 'properties' => $properties, - 'message' => $message, - ], - $response - ); - } - - public function ping() - { - return $this->send(['type' => Protocol\Types::PINGREQ]); - } - - public function close(int $code = ReasonCode::NORMAL_DISCONNECTION, array $properties = []): bool - { - $this->send(['type' => Protocol\Types::DISCONNECT, 'code' => $code, 'properties' => $properties], false); - - return $this->client->close(); } - public function auth(int $code = ReasonCode::SUCCESS, array $properties = []) - { - return $this->send(['type' => Protocol\Types::AUTH, 'code' => $code, 'properties' => $properties]); - } - - private function reConnect() + protected function reConnect(): void { $result = false; $maxAttempts = $this->getConfig()->getMaxAttempts(); @@ -185,34 +50,19 @@ private function reConnect() $this->handleException(); } $this->sleep($delay); - $this->client->close(); - $result = $this->client->connect($this->getHost(), $this->getPort()); + $this->getClient()->close(); + $result = $this->getClient()->connect($this->getHost(), $this->getPort()); if ($maxAttempts > 0) { $maxAttempts--; } } } - private function handleException() - { - if ($this->isCoroutineClientType()) { - $errMsg = $this->client->errMsg; - } else { - $errMsg = socket_strerror($this->client->errCode); - } - $this->client->close(); - throw new ConnectException($errMsg, $this->client->errCode); - } - public function send(array $data, bool $response = true) { - if ($this->getConfig()->isMQTT5()) { - $package = Protocol\V5::pack($data); - } else { - $package = Protocol\V3::pack($data); - } + $package = $this->getConfig()->isMQTT5() ? Protocol\V5::pack($data) : Protocol\V3::pack($data); - $this->client->send($package); + $this->getClient()->send($package); if ($response) { return $this->recv(); @@ -224,19 +74,15 @@ public function send(array $data, bool $response = true) public function recv() { $response = $this->getResponse(); - if ($response === '' || !$this->client->isConnected()) { + if ($response === '' || !$this->getClient()->isConnected()) { $this->reConnect(); $this->connect($this->getConnectData('clean_session') ?? true, $this->getConnectData('will') ?? []); - } elseif ($response === false && $this->client->errCode !== SOCKET_ETIMEDOUT) { + } elseif ($response === false && $this->getClient()->errCode !== SOCKET_ETIMEDOUT) { $this->handleException(); } elseif (is_string($response) && strlen($response) !== 0) { $this->handleVerbose($response); - if ($this->getConfig()->isMQTT5()) { - return Protocol\V5::unpack($response); - } - - return Protocol\V3::unpack($response); + return $this->getConfig()->isMQTT5() ? Protocol\V5::unpack($response) : Protocol\V3::unpack($response); } return true; @@ -245,103 +91,14 @@ public function recv() protected function getResponse() { if ($this->isCoroutineClientType()) { - $response = $this->client->recv(); + $response = $this->getClient()->recv(); } else { $write = $error = []; - $read = [$this->client]; + $read = [$this->getClient()]; $n = swoole_client_select($read, $write, $error); - if ($n > 0) { - $response = $this->client->recv(); - } else { - $response = true; - } + $response = $n > 0 ? $this->getClient()->recv() : true; } return $response; } - - protected function isCoroutineClientType(): bool - { - return $this->clientType === self::COROUTINE_CLIENT_TYPE; - } - - public function buildMessageId(): int - { - if ($this->messageId === 65535) { - $this->messageId = 0; - } - - return ++$this->messageId; - } - - public static function genClientID(string $prefix = 'Simps_'): string - { - return uniqid($prefix); - } - - public function sleep(int $ms): void - { - if ($this->isCoroutineClientType()) { - Coroutine::sleep($ms / 1000); - } else { - usleep($ms * 1000); - } - } - - public function getHost(): string - { - return $this->host; - } - - public function getPort(): int - { - return $this->port; - } - - public function getConfig(): ClientConfig - { - return $this->config; - } - - public function getConnectData(?string $key = null) - { - if ($key) { - if (isset($this->connectData[$key])) { - return $this->connectData[$key]; - } - - return null; - } - - return $this->connectData; - } - - public function getClient() - { - return $this->client; - } - - protected function handleVerbose(string $data) - { - switch ($this->getConfig()->getVerbose()) { - case MQTT_VERBOSE_HEXDUMP: - echo Common::hexDump($data), PHP_EOL; - break; - case MQTT_VERBOSE_HEXDUMP_ASCII: - echo Common::hexDumpAscii($data), PHP_EOL; - break; - case MQTT_VERBOSE_ASCII: - echo Common::ascii($data), PHP_EOL; - break; - case MQTT_VERBOSE_TEXT: - echo Common::printableText($data), PHP_EOL; - break; - case MQTT_VERBOSE_HEX_STREAM: - echo Common::hexStream($data), PHP_EOL; - break; - case MQTT_VERBOSE_NONE: - default: - break; - } - } } diff --git a/src/Config/ClientConfig.php b/src/Config/ClientConfig.php index 69fbef3..4675896 100644 --- a/src/Config/ClientConfig.php +++ b/src/Config/ClientConfig.php @@ -24,6 +24,11 @@ class ClientConfig extends AbstractConfig 'open_mqtt_protocol' => true, ]; + /** @var array */ + protected $headers = [ + 'Sec-Websocket-Protocol' => 'mqtt', + ]; + /** @var string */ protected $userName = ''; @@ -46,7 +51,7 @@ class ClientConfig extends AbstractConfig protected $delay = 3000; /** @var int */ - protected $maxAttempts = -1; + protected $maxAttempts = 0; /** @var int */ protected $sockType = SWOOLE_SOCK_TCP; @@ -78,6 +83,18 @@ public function setSwooleConfig(array $config): self return $this; } + public function getHeaders(): array + { + return $this->headers; + } + + public function setHeaders(array $headers): self + { + $this->headers = array_merge($this->headers, $headers); + + return $this; + } + public function getUserName(): string { return $this->userName; diff --git a/src/WebSocketClient.php b/src/WebSocketClient.php new file mode 100644 index 0000000..bfc2211 --- /dev/null +++ b/src/WebSocketClient.php @@ -0,0 +1,108 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code. + */ +namespace Simps\MQTT; + +use Simps\MQTT\Config\ClientConfig; +use Swoole\Coroutine\Http\Client; +use Swoole\Http\Status; +use Swoole\WebSocket\Frame; + +class WebSocketClient extends BaseClient +{ + public function __construct( + string $host, + int $port, + ClientConfig $config, + string $path = '/mqtt', + bool $ssl = false + ) { + $this->setHost($host) + ->setPort($port) + ->setConfig($config) + ->setPath($path) + ->setSsl($ssl); + + $client = new Client($host, $port, $ssl); + $client->set($config->getSwooleConfig()); + $client->setHeaders($config->getHeaders()); + $upgrade = $client->upgrade($path); + $this->setClient($client); + if (!$upgrade || $client->getStatusCode() !== Status::SWITCHING_PROTOCOLS) { + $this->handleException(); + } + } + + protected function reConnect(): void + { + $result = false; + $maxAttempts = $this->getConfig()->getMaxAttempts(); + $delay = $this->getConfig()->getDelay(); + while (!$result) { + if ($maxAttempts === 0) { + $this->handleException(); + } + $this->sleep($delay); + $this->getClient()->close(); + $upgrade = $this->getClient()->upgrade($this->getPath()); + if ($upgrade && $this->getClient()->getStatusCode() === Status::SWITCHING_PROTOCOLS) { + $result = true; + } + if ($maxAttempts > 0) { + $maxAttempts--; + } + } + } + + public function send(array $data, bool $response = true) + { + $package = $this->getConfig()->isMQTT5() ? Protocol\V5::pack($data) : Protocol\V3::pack($data); + + $this->getClient()->push($package, WEBSOCKET_OPCODE_BINARY); + + if ($response) { + return $this->recv(); + } + + return true; + } + + public function recv() + { + $response = $this->getResponse(); + if ($response === false && $this->getClient()->errCode === 0) { + $this->reConnect(); + $this->connect($this->getConnectData('clean_session') ?? true, $this->getConnectData('will') ?? []); + } elseif ($response === false && $this->getClient()->errCode !== SOCKET_ETIMEDOUT) { + $this->handleException(); + } elseif (is_string($response) && strlen($response) !== 0) { + $this->handleVerbose($response); + + return $this->getConfig()->isMQTT5() ? Protocol\V5::unpack($response) : Protocol\V3::unpack($response); + } + + return true; + } + + protected function getResponse() + { + $response = $this->getClient()->recv(); + if ($response === false) { + return false; + } + if ($response instanceof Frame) { + return $response->data; + } + + return true; + } +} diff --git a/tests/V3/WebSocketTest.php b/tests/V3/WebSocketTest.php new file mode 100644 index 0000000..a707b2f --- /dev/null +++ b/tests/V3/WebSocketTest.php @@ -0,0 +1,121 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code. + */ +namespace SimpsTest\MQTT\V3; + +use PHPUnit\Framework\TestCase; +use Simps\MQTT\Exception\ProtocolException; +use Simps\MQTT\Hex\ReasonCode; +use Simps\MQTT\Protocol\Types; +use Simps\MQTT\WebSocketClient; + +/** + * @internal + * @coversNothing + */ +class WebSocketTest extends TestCase +{ + private static $topic = ''; + + private static $client; + + public static function setUpBeforeClass(): void + { + self::$topic = 'testtopic/simps-' . rand(100, 999); + self::$client = new WebSocketClient(SIMPS_MQTT_REMOTE_HOST, SIMPS_MQTT_OVER_WEBSOCKET_PORT, getTestConnectConfig()); + } + + public static function tearDownAfterClass(): void + { + self::$topic = ''; + self::$client = null; + } + + public function testConnect() + { + $res = self::$client->connect(); + $this->assertIsArray($res); + $this->assertSame(Types::CONNACK, $res['type']); + } + + /** + * @depends testConnect + */ + public function testSubscribe() + { + $topics[self::$topic] = 1; + $res = self::$client->subscribe($topics); + $this->assertIsArray($res); + $this->assertSame(Types::SUBACK, $res['type']); + $this->assertSame(ReasonCode::GRANTED_QOS_1, $res['codes'][0]); + } + + /** + * @depends testSubscribe + */ + public function testPublish() + { + $buffer = self::$client->publish(self::$topic, 'hello,simps', 1); + $this->assertIsArray($buffer); + $this->assertSame(Types::PUBACK, $buffer['type']); + } + + /** + * @depends testPublish + */ + public function testRecv() + { + $buffer = self::$client->recv(); + $this->assertIsArray($buffer); + $this->assertSame(Types::PUBLISH, $buffer['type']); + $this->assertSame(self::$topic, $buffer['topic']); + $this->assertSame('hello,simps', $buffer['message']); + } + + /** + * @depends testRecv + */ + public function testPing() + { + $buffer = self::$client->ping(); + $this->assertIsArray($buffer); + $this->assertSame(Types::PINGRESP, $buffer['type']); + } + + /** + * @depends testPing + */ + public function testUnsubscribe() + { + $status = self::$client->unSubscribe([self::$topic]); + $this->assertIsArray($status); + $this->assertSame(Types::UNSUBACK, $status['type']); + } + + /** + * @depends testUnsubscribe + */ + public function testClose() + { + $status = self::$client->close(); + $this->assertTrue($status); + } + + public function testPublishNonTopic() + { + $client = new WebSocketClient(SIMPS_MQTT_REMOTE_HOST, SIMPS_MQTT_OVER_WEBSOCKET_PORT, getTestConnectConfig()); + $client->connect(); + $this->expectException(ProtocolException::class); + $this->expectExceptionMessage('Topic cannot be empty'); + $client->publish('', 'hello,simps'); + } +} diff --git a/tests/V5/WebSocketTest.php b/tests/V5/WebSocketTest.php new file mode 100644 index 0000000..4c48575 --- /dev/null +++ b/tests/V5/WebSocketTest.php @@ -0,0 +1,145 @@ + + * + * For the full copyright and license information, + * please view the LICENSE file that was distributed with this source code. + */ +namespace SimpsTest\MQTT\V5; + +use PHPUnit\Framework\TestCase; +use Simps\MQTT\Exception\ProtocolException; +use Simps\MQTT\Hex\ReasonCode; +use Simps\MQTT\Protocol\Types; +use Simps\MQTT\WebSocketClient; +use Swoole\Coroutine; + +/** + * @internal + * @coversNothing + */ +class WebSocketTest extends TestCase +{ + private static $topic = ''; + + private static $client; + + public static function setUpBeforeClass(): void + { + self::$topic = 'testtopic/simps-' . rand(100, 999); + self::$client = new WebSocketClient(SIMPS_MQTT_REMOTE_HOST, SIMPS_MQTT_OVER_WEBSOCKET_PORT, getTestMQTT5ConnectConfig()); + } + + public static function tearDownAfterClass(): void + { + self::$topic = ''; + self::$client = null; + } + + public function testConnect() + { + $res = self::$client->connect(); + $this->assertIsArray($res); + $this->assertSame(Types::CONNACK, $res['type']); + } + + /** + * @depends testConnect + */ + public function testSubscribe() + { + $topics = [ + self::$topic . '/get' => [ + 'qos' => 1, + 'no_local' => true, + 'retain_as_published' => true, + 'retain_handling' => 2, + ], + self::$topic . '/update' => [ + 'qos' => 2, + 'no_local' => false, + 'retain_as_published' => true, + 'retain_handling' => 2, + ], + ]; + $res = self::$client->subscribe($topics); + $this->assertIsArray($res); + $this->assertSame(Types::SUBACK, $res['type']); + $this->assertIsArray($res['codes']); + $this->assertSame(ReasonCode::GRANTED_QOS_1, $res['codes'][0]); + $this->assertSame(ReasonCode::GRANTED_QOS_2, $res['codes'][1]); + } + + /** + * @depends testSubscribe + */ + public function testPublish() + { + Coroutine::create(function () { + $client = new WebSocketClient(SIMPS_MQTT_REMOTE_HOST, SIMPS_MQTT_OVER_WEBSOCKET_PORT, getTestMQTT5ConnectConfig()); + $res = $client->connect(); + $this->assertIsArray($res); + $buffer = $client->publish(self::$topic . '/get', 'hello,simps', 1); + $this->assertIsArray($buffer); + $this->assertSame(Types::PUBACK, $buffer['type']); + $this->assertSame('Success', ReasonCode::getReasonPhrase($buffer['code'])); + }); + } + + /** + * @depends testSubscribe + */ + public function testRecv() + { + $buffer = self::$client->recv(); + $this->assertIsArray($buffer); + $this->assertSame(Types::PUBLISH, $buffer['type']); + $this->assertSame(self::$topic . '/get', $buffer['topic']); + $this->assertSame('hello,simps', $buffer['message']); + } + + /** + * @depends testRecv + */ + public function testPing() + { + $buffer = self::$client->ping(); + $this->assertIsArray($buffer); + $this->assertSame(Types::PINGRESP, $buffer['type']); + } + + /** + * @depends testPing + */ + public function testUnsubscribe() + { + $status = self::$client->unSubscribe([self::$topic . '/get', self::$topic . '/update']); + $this->assertIsArray($status); + $this->assertSame(Types::UNSUBACK, $status['type']); + $this->assertSame('Success', ReasonCode::getReasonPhrase($status['codes'][0])); + $this->assertSame('Success', ReasonCode::getReasonPhrase($status['codes'][1])); + } + + /** + * @depends testUnsubscribe + */ + public function testClose() + { + $status = self::$client->close(); + $this->assertTrue($status); + } + + public function testPublishNonTopic() + { + $client = new WebSocketClient(SIMPS_MQTT_REMOTE_HOST, SIMPS_MQTT_OVER_WEBSOCKET_PORT, getTestMQTT5ConnectConfig()); + $client->connect(); + $this->expectException(ProtocolException::class); + $this->expectExceptionMessage('Topic cannot be empty or need to set topic_alias'); + $client->publish('', 'hello,simps'); + } +}