From 47f84fc34e657c53144e4a08ae5d9beeb976704b Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 25 Nov 2024 21:14:03 +0800 Subject: [PATCH 1/2] Expose keep alive interval for c and c++ client --- include/pulsar/ClientConfiguration.h | 13 +++++++++++++ include/pulsar/c/client_configuration.h | 6 ++++++ lib/ClientConfiguration.cc | 10 ++++++++++ lib/ClientConfigurationImpl.h | 1 + lib/ClientConnection.cc | 7 +++---- lib/ClientConnection.h | 1 + lib/c/c_ClientConfiguration.cc | 10 ++++++++++ tests/c/c_ClientConfigurationTest.cc | 3 +++ 8 files changed, 47 insertions(+), 4 deletions(-) diff --git a/include/pulsar/ClientConfiguration.h b/include/pulsar/ClientConfiguration.h index cc3c3ed3..06b3d796 100644 --- a/include/pulsar/ClientConfiguration.h +++ b/include/pulsar/ClientConfiguration.h @@ -356,6 +356,19 @@ class PULSAR_PUBLIC ClientConfiguration { */ int getConnectionTimeout() const; + /** + * Set keep alive interval for each client-broker-connection. (default: 30 seconds). + * + * @param keepAliveIntervalInSeconds + * @return + */ + ClientConfiguration& setKeepAliveIntervalInSeconds(unsigned int keepAliveIntervalInSeconds); + + /** + * The getter associated with setConnectionTimeout(). + */ + unsigned int getKeepAliveIntervalInSeconds() const; + friend class ClientImpl; friend class PulsarWrapper; diff --git a/include/pulsar/c/client_configuration.h b/include/pulsar/c/client_configuration.h index 75c24b32..6c4f3dfb 100644 --- a/include/pulsar/c/client_configuration.h +++ b/include/pulsar/c/client_configuration.h @@ -204,6 +204,12 @@ PULSAR_PUBLIC const unsigned int pulsar_client_configuration_get_partitions_upda PULSAR_PUBLIC const unsigned int pulsar_client_configuration_get_stats_interval_in_seconds( pulsar_client_configuration_t *conf); +PULSAR_PUBLIC void pulsar_client_configuration_set_keep_alive_interval_in_seconds( + pulsar_client_configuration_t *conf, unsigned int keepAliveIntervalInSeconds); + +PULSAR_PUBLIC unsigned int pulsar_client_configuration_get_keep_alive_interval_in_seconds( + pulsar_client_configuration_t *conf); + #ifdef __cplusplus } #endif diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc index 6a91ec1d..05de1940 100644 --- a/lib/ClientConfiguration.cc +++ b/lib/ClientConfiguration.cc @@ -214,6 +214,16 @@ ClientConfiguration& ClientConfiguration::setConnectionTimeout(int timeoutMs) { int ClientConfiguration::getConnectionTimeout() const { return impl_->connectionTimeoutMs; } +ClientConfiguration& ClientConfiguration::setKeepAliveIntervalInSeconds( + unsigned int keepAliveIntervalInSeconds) { + impl_->keepAliveIntervalInSeconds = keepAliveIntervalInSeconds; + return *this; +} + +unsigned int ClientConfiguration::getKeepAliveIntervalInSeconds() const { + return impl_->keepAliveIntervalInSeconds; +} + ClientConfiguration& ClientConfiguration::setDescription(const std::string& description) { if (description.length() > 64) { throw std::invalid_argument("The description length exceeds 64"); diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h index 0721c0a1..e7a83a19 100644 --- a/lib/ClientConfigurationImpl.h +++ b/lib/ClientConfigurationImpl.h @@ -47,6 +47,7 @@ struct ClientConfigurationImpl { unsigned int partitionsUpdateInterval{60}; // 1 minute std::string listenerName; int connectionTimeoutMs{10000}; // 10 seconds + unsigned int keepAliveIntervalInSeconds{30}; std::string description; std::string proxyServiceUrl; ClientConfiguration::ProxyProtocol proxyProtocol; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 1705cb1f..f66b2da2 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -51,8 +51,6 @@ using proto::BaseCommand; static const uint32_t DefaultBufferSize = 64 * 1024; -static const int KeepAliveIntervalInSeconds = 30; - static MessageId toMessageId(const proto::MessageIdData& messageIdData) { return MessageIdBuilder::from(messageIdData).build(); } @@ -186,6 +184,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: connectTimeoutTask_( std::make_shared(*executor_, clientConfiguration.getConnectionTimeout())), outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), + keepAliveIntervalInSeconds_(clientConfiguration.getKeepAliveIntervalInSeconds()), consumerStatsRequestTimer_(executor_->createDeadlineTimer()), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), clientVersion_(clientVersion), @@ -310,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC // Only send keep-alive probes if the broker supports it keepAliveTimer_ = executor_->createDeadlineTimer(); if (keepAliveTimer_) { - keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds)); + keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); auto weakSelf = weak_from_this(); keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { auto self = weakSelf.lock(); @@ -1245,7 +1244,7 @@ void ClientConnection::handleKeepAliveTimeout() { // be zero And we do not attempt to dereference the pointer. Lock lock(mutex_); if (keepAliveTimer_) { - keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds)); + keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); auto weakSelf = weak_from_this(); keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { auto self = weakSelf.lock(); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 638edb48..7646f85e 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -388,6 +388,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_thisconf.getPartitionsUpdateInterval(); } + +void pulsar_client_configuration_set_keep_alive_interval_in_seconds(pulsar_client_configuration_t *conf, + unsigned int keepAliveIntervalInSeconds) { + conf->conf.setKeepAliveIntervalInSeconds(keepAliveIntervalInSeconds); +} + +unsigned int pulsar_client_configuration_get_keep_alive_interval_in_seconds( + pulsar_client_configuration_t *conf) { + return conf->conf.getKeepAliveIntervalInSeconds(); +} diff --git a/tests/c/c_ClientConfigurationTest.cc b/tests/c/c_ClientConfigurationTest.cc index a7dd0226..6f181fd0 100644 --- a/tests/c/c_ClientConfigurationTest.cc +++ b/tests/c/c_ClientConfigurationTest.cc @@ -34,4 +34,7 @@ TEST(C_ClientConfigurationTest, testCApiConfig) { pulsar_client_configuration_set_partitions_update_interval(conf, 10); ASSERT_EQ(pulsar_client_configuration_get_partitions_update_interval(conf), 10); + + pulsar_client_configuration_set_keep_alive_interval_in_seconds(conf, 60); + ASSERT_EQ(pulsar_client_configuration_get_keep_alive_interval_in_seconds(conf), 60); } From 3298a5a563d94a9e5d96b4af18553f765b636823 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 25 Nov 2024 21:15:12 +0800 Subject: [PATCH 2/2] Optimize --- include/pulsar/ClientConfiguration.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/pulsar/ClientConfiguration.h b/include/pulsar/ClientConfiguration.h index 06b3d796..e8c46572 100644 --- a/include/pulsar/ClientConfiguration.h +++ b/include/pulsar/ClientConfiguration.h @@ -365,7 +365,7 @@ class PULSAR_PUBLIC ClientConfiguration { ClientConfiguration& setKeepAliveIntervalInSeconds(unsigned int keepAliveIntervalInSeconds); /** - * The getter associated with setConnectionTimeout(). + * The getter associated with setKeepAliveIntervalInSeconds(). */ unsigned int getKeepAliveIntervalInSeconds() const;