English | 中文
This is a Apache Pulsar client library implemented in php Reference PulsarApi.proto
PHP 7.x (Not supported PHP8)
Protobuf Extension
Because Apache Pulsar uses the proto2 syntax
Since the Google protobuf for PHP extension does not support the proto2 syntax
So install the allegro/php-protobuf extension instead rather than protocolbuffers/protobuf
Unfortunately, since the allegro/php-protobuf extension library does not support php8, it cannot support php8 either
ZLib Extension(If you want to use zlib compression)
composer require ikilobyte/pulsar-client-php
cd /usr/local/src
git clone https://github.com/allegro/php-protobuf
cd php-protobuf
./configure --with-php-config=$(which php-config)
make && make install
# add to php.ini
use Pulsar\Authentication\Jwt;
use Pulsar\Compression\Compression;
use Pulsar\Producer;
use Pulsar\ProducerOptions;
use Pulsar\MessageOptions;
require_once __DIR__ . '/vendor/autoload.php';
$options = new ProducerOptions();
// If permission authentication is available
// Only JWT authentication is currently supported
$options->setAuthentication(new Jwt('token'));
$producer = new Producer('pulsar://localhost:6650', $options);
for ($i = 0; $i < 10; $i++) {
$messageID = $producer->send(sprintf('hello %d',$i));
echo 'messageID ' . $messageID . "\n";
// Sending messages asynchronously
//for ($i = 0; $i < 10; $i++) {
// $producer->sendAsync(sprintf('hello-async %d',$i),function(string $messageID){
// echo 'messageID ' . $messageID . "\n";
// });
//// Add this line when sending asynchronously
// Sending delayed messages
for ($i = 0; $i < 10; $i++) {
$producer->send(sprintf('hello-delay %d',$i),[
MessageOptions::DELAY_SECONDS => $i * 5, // Seconds
// close
use Pulsar\Authentication\Jwt;
use Pulsar\Consumer;
use Pulsar\ConsumerOptions;
use Pulsar\SubscriptionType;
require_once __DIR__ . '/vendor/autoload.php';
$options = new ConsumerOptions();
// If permission authentication is available
// Only JWT authentication is currently supported
$options->setAuthentication(new Jwt('token'));
// Configure how many seconds Nack's messages are redelivered, the default is 1 minute
$consumer = new Consumer('pulsar://localhost:6650', $options);
while (true) {
$message = $consumer->receive();
echo sprintf('Got message 【%s】messageID[%s] topic[%s] publishTime[%s]',
) . "\n";
// ...
// Remember to confirm that the message is complete after processing
// When processing fails, you can also execute the Nack
// The message will be re-delivered after the specified time
// $consumer->nack($message);
- ProducerOptions
- setTopic()
- setAuthentication()
- setConnectTimeout()
- setProducerName()
- setCompression()
- ConsumerOptions
- setTopic()
- setAuthentication()
- setConnectTimeout()
- setConsumerName()
- setSubscription()
- setSubscriptionType()
- setNackRedeliveryDelay()
- setReceiveQueueSize()
- MessageOptions