Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting SocketException while producing an event to Kafka Topic #89

Open
karunk-hq opened this issue Jul 12, 2023 · 0 comments
Open

Getting SocketException while producing an event to Kafka Topic #89

karunk-hq opened this issue Jul 12, 2023 · 0 comments

Comments

@karunk-hq
Copy link

  • What problem did you encounter?
    I'm trying to integrate Kafka producer with Laravel worker. After starting my worker, I noticed that publishing to Kafka is failing with following exception
AMQP: Failed to Process data with exception: longlang\phpkafka\Exception\SocketException: Could not write 659 bytes to stream in /home/shard/hiver/web/web/vendor/longlang/phpkafka/src/Socket/StreamSocket.php:123

On further analysis, I found out that before publishing event to topic, we firstly check if socket is writable or not

// wait for stream to become available for writing
$writable = $this->select([$this->socket], $timeout, false);

if (false === $writable) {
       $this->close();
        throw new SocketException('Could not write ' . $bytesToWrite . ' bytes to stream');
}

Any help here would be appreciated

  • Is the Kafka environment self-built or cloud service?

  • Please execute the following command to get environment information.

php -v & php --ri swoole & composer info | grep longlang/phpkafka

# Paste here

  • Provide the smallest reproducible code:
    My Kafka Client
class KafkaClientManager
{
    
    public static function getInstance()
    {
        $brokers = explode(',', config('kafka_broker_url'));

        $config = new ProducerConfig();
        $config->setBootstrapServer($brokers[0]);
        $config->setUpdateBrokers(true);
        $config->setAcks(-1);

        $producer = null;

        try {
            $producer = new Producer($config);

        } catch (ConnectionException $ce) {
            // when current broker cannot be connected to, try connecting to a different one

            $config->setBootstrapServer($brokers[1]);
            $producer = new Producer($config);
        }
        return $producer;
    }

    public static function sendMessage($topicName, $messageBody, $partitionKey = null)
    {
        $producer = app('kafka');
        try {
            $producer->send($topicName, $messageBody, $partitionKey);
        } catch (KafkaErrorException | FatalThrowableError $e) {
            // when current broker cannot be connected to, try connecting to a different one

            $producer = self::getInstance();
            $producer->send($topicName, $messageBody, $partitionKey);
        }

    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant