Skip to content

Commit

Permalink
[PIP-60] [Proxy-Server] Support SNI routing for Pulsar CPP client
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Dec 16, 2023
1 parent 5c77648 commit 218c751
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 5 deletions.
2 changes: 2 additions & 0 deletions examples/SampleProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* under the License.
*/
#include <pulsar/Client.h>
#include <pulsar/ClientConfiguration.h>
#include <pulsar/Authentication.h>

#include <iostream>

Expand Down
29 changes: 29 additions & 0 deletions include/pulsar/ClientConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class PULSAR_PUBLIC ClientConfiguration {
~ClientConfiguration();
ClientConfiguration(const ClientConfiguration&);
ClientConfiguration& operator=(const ClientConfiguration&);
enum ProxyProtocol
{
SNI = 0
};

/**
* Configure a limit on the amount of memory that will be allocated by this client instance.
Expand Down Expand Up @@ -305,6 +309,31 @@ class PULSAR_PUBLIC ClientConfiguration {
*/
ClientConfiguration& setConnectionTimeout(int timeoutMs);

/**
* Set proxy-service url when client would like to connect to broker via proxy. Client must configure both
* proxyServiceUrl and appropriate proxyProtocol.
*
* Example: pulsar+ssl://ats-proxy.example.com:4443
*
* @param proxyServiceUrl proxy url to connect with broker
* @return
*/
ClientConfiguration& setProxyServiceUrl(const std::string& proxyServiceUrl);

const std::string& getProxyServiceUrl() const;

/**
* Set appropriate proxy-protocol along with proxy-service url. Currently Pulsar supports SNI proxy routing.
*
* SNI routing: https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
*
* @param proxyProtocol possible options (SNI)
* @return
*/
ClientConfiguration& setProxyProtocol(const ProxyProtocol proxyProtocol);

const ProxyProtocol getProxyProtocol() const;

/**
* The getter associated with setConnectionTimeout().
*/
Expand Down
18 changes: 18 additions & 0 deletions lib/ClientConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,24 @@ ClientConfiguration& ClientConfiguration::setConcurrentLookupRequest(int concurr
return *this;
}

ClientConfiguration& ClientConfiguration::setProxyServiceUrl(const std::string& proxyServiceUrl) {
impl_->proxyServiceUrl = proxyServiceUrl;
return *this;
}

const std::string& ClientConfiguration::getProxyServiceUrl() const {
return impl_->proxyServiceUrl;
}

ClientConfiguration& ClientConfiguration::setProxyProtocol(const ClientConfiguration::ProxyProtocol proxyProtocol) {
impl_->proxyProtocol = proxyProtocol;
return *this;
}

const ClientConfiguration::ProxyProtocol ClientConfiguration::getProxyProtocol() const {
return impl_->proxyProtocol;
}

int ClientConfiguration::getConcurrentLookupRequest() const { return impl_->concurrentLookupRequest; }

ClientConfiguration& ClientConfiguration::setMaxLookupRedirects(int maxLookupRedirects) {
Expand Down
2 changes: 2 additions & 0 deletions lib/ClientConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct ClientConfigurationImpl {
std::string listenerName;
int connectionTimeoutMs{10000}; // 10 seconds
std::string description;
std::string proxyServiceUrl;
ClientConfiguration::ProxyProtocol proxyProtocol;

std::unique_ptr<LoggerFactory> takeLogger() { return std::move(loggerFactory); }
};
Expand Down
21 changes: 17 additions & 4 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,15 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
boost::asio::ssl::context ctx(executor_->getIOService(), boost::asio::ssl::context::tlsv1_client);
#endif
Url serviceUrl;
Url proxyUrl;
Url::parse(physicalAddress, serviceUrl);
proxyServiceUrl_ = clientConfiguration.getProxyServiceUrl();
proxyProtocol_ = clientConfiguration.getProxyProtocol();
if (proxyProtocol_ == ClientConfiguration::SNI && !proxyServiceUrl_.empty()) {
Url::parse(proxyServiceUrl_, proxyUrl);
isSniProxy_ = true;
LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_);
}
if (clientConfiguration.isTlsAllowInsecureConnection()) {
ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
isTlsAllowInsecureConnection_ = true;
Expand Down Expand Up @@ -251,7 +259,9 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:

if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(serviceUrl.host()));
std::string urlHost =
isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(urlHost));
}

LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
Expand Down Expand Up @@ -397,7 +407,8 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
if (logicalAddress_ == physicalAddress_) {
LOG_INFO(cnxString_ << "Connected to broker");
} else {
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " <<
logicalAddress_ << ", proxy: " << proxyServiceUrl_);
}

Lock lock(mutex_);
Expand Down Expand Up @@ -566,7 +577,8 @@ void ClientConnection::tcpConnectAsync() {

boost::system::error_code err;
Url service_url;
if (!Url::parse(physicalAddress_, service_url)) {
std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
if (!Url::parse(hostUrl, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
Expand Down Expand Up @@ -594,7 +606,8 @@ void ClientConnection::tcpConnectAsync() {
void ClientConnection::handleResolve(const boost::system::error_code& err,
tcp::resolver::iterator endpointIterator) {
if (err) {
LOG_ERROR(cnxString_ << "Resolve error: " << err << " : " << err.message());
std::string hostUrl = isSniProxy_? cnxString_ : proxyServiceUrl_;
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
close();
return;
}
Expand Down
5 changes: 5 additions & 0 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
*/
const std::string physicalAddress_;

std::string proxyServiceUrl_;

ClientConfiguration::ProxyProtocol proxyProtocol_;

// Represent both endpoint of the tcp connection. eg: [client:1234 -> server:6650]
std::string cnxString_;

Expand Down Expand Up @@ -386,6 +390,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

// Signals whether we're waiting for a response from broker
bool havePendingPingRequest_ = false;
bool isSniProxy_ = false;
DeadlineTimerPtr keepAliveTimer_;
DeadlineTimerPtr consumerStatsRequestTimer_;

Expand Down
1 change: 0 additions & 1 deletion tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1334,5 +1334,4 @@ TEST(ConsumerTest, testRetrySubscribe) {
// TODO: Currently it's hard to test the timeout error without configuring the operation timeout in
// milliseconds
}

} // namespace pulsar

0 comments on commit 218c751

Please sign in to comment.