From 7862234f5f6a5b341693f9978773b90d73d6fe25 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 11:09:48 +0800 Subject: [PATCH 01/12] Initial commit --- include/pulsar/Client.h | 15 +++++++++++++++ lib/Client.cc | 7 +++++++ lib/ClientImpl.cc | 8 ++++++++ lib/ClientImpl.h | 4 ++++ 4 files changed, 34 insertions(+) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 56130667..95233fe9 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -32,6 +32,7 @@ #include #include +#include #include namespace pulsar { @@ -414,6 +415,20 @@ class PULSAR_PUBLIC Client { void getSchemaInfoAsync(const std::string& topic, int64_t version, std::function callback); + /** + * Update the connection information of the client, including service URL, authentication and TLS trust + * certs file path. + * This method is used to switch the connection to a different Pulsar cluster. All connections will be + * closed and the internal connection info will be updated. + * + * @param serviceUrl the Pulsar endpoint to use (eg: pulsar://localhost:6650) + * @param authentication the authentication information to use for connecting to the Pulsar cluster + * @param tlsTrustCertsFilePath the TLS trust certs file path to use for connecting to the Pulsar cluster + */ + void updateConnectionInfo(const std::string& serviceUrl, + const std::optional& authentication, + const std::optional& tlsTrustCertsFilePath); + private: Client(const std::shared_ptr&); diff --git a/lib/Client.cc b/lib/Client.cc index 39a5948a..60fb62f5 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -197,4 +197,11 @@ void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, ->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") .addListener(std::move(callback)); } + +void Client::updateConnectionInfo(const std::string& serviceUrl, + const std::optional& authentication, + const std::optional& tlsTrustCertsFilePath) { + impl_->updateConnectionInfo(serviceUrl, authentication, tlsTrustCertsFilePath); +} + } // namespace pulsar diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index eec3b34a..229ee2f3 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -854,4 +854,12 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati return clientConfiguration.impl_->operationTimeout; } +void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, + const std::optional& authentication, + const std::optional& tlsTrustCertsFilePath) { + // TODO: + // 1. Reset the `lookupServicePtr_` with the new serviceUrl and auth parameters, and close the old one. + // 2. Close all connections in `pool_` +} + } /* namespace pulsar */ diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 0b4d5969..e89aa98a 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -139,6 +139,10 @@ class ClientImpl : public std::enable_shared_from_this { ConnectionPool& getConnectionPool() noexcept { return pool_; } uint64_t getLookupCount() { return lookupCount_; } + void updateConnectionInfo(const std::string& serviceUrl, + const std::optional& authentication, + const std::optional& tlsTrustCertsFilePath); + static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); friend class PulsarFriend; From a1746a9d1bf3c37c398845270ec90f36fb06f39e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 11:33:24 +0800 Subject: [PATCH 02/12] implemented --- lib/ClientImpl.cc | 64 ++++++++++++++++++++++++++++++++----------- lib/ConnectionPool.cc | 15 ++++++++++ lib/ConnectionPool.h | 6 ++++ tests/ClientTest.cc | 47 +++++++++++++++++++++++++++++++ 4 files changed, 116 insertions(+), 16 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 229ee2f3..9be08415 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -144,11 +144,11 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { } LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) { + Lock lock(mutex_); if (redirectedClusterURI.empty()) { return lookupServicePtr_; } - Lock lock(mutex_); auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); if (it == redirectedClusterLookupServicePtrs_.end()) { auto lookup = createLookup(redirectedClusterURI); @@ -180,7 +180,8 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon if (autoDownloadSchema) { auto self = shared_from_this(); - lookupServicePtr_->getSchema(topicName).addListener( + auto lookup = getLookup(); + lookup->getSchema(topicName).addListener( [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { if (res != ResultOk) { callback(res, Producer()); @@ -188,12 +189,12 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } ProducerConfiguration conf; conf.setSchema(topicSchema); - self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + self->getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); }); } else { - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); } @@ -266,7 +267,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st } MessageId msgId(startMessageId); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, msgId, conf, callback)); } @@ -379,7 +380,8 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const return; } - lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) + getLookup() + ->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, regexPattern, mode, subscriptionName, conf, callback)); @@ -403,7 +405,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace consumer = std::make_shared(shared_from_this(), regexPattern, mode, *matchTopics, subscriptionName, conf, - lookupServicePtr_, interceptors); + getLookup(), interceptors); consumer->getConsumerCreatedFuture().addListener( std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -450,7 +452,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto interceptors = std::make_shared(conf.getInterceptors()); ConsumerImplBasePtr consumer = std::make_shared( - shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_, interceptors); + shared_from_this(), topics, subscriptionName, topicNamePtr, conf, getLookup(), interceptors); consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -480,7 +482,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, subscriptionName, conf, callback)); } @@ -505,7 +507,7 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti } consumer = std::make_shared( shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf, - lookupServicePtr_, interceptors); + getLookup(), interceptors); } else { auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), subscriptionName, conf, @@ -658,7 +660,7 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP return; } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, callback)); } @@ -674,7 +676,7 @@ void ClientImpl::closeAsync(const CloseCallback& callback) { state_ = Closing; memoryLimitController_.close(); - lookupServicePtr_->close(); + getLookup()->close(); for (const auto& it : redirectedClusterLookupServicePtrs_) { it.second->close(); } @@ -776,7 +778,7 @@ void ClientImpl::shutdown() { << " consumers have been shutdown."); } - lookupServicePtr_->close(); + getLookup()->close(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; @@ -857,9 +859,39 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, const std::optional& authentication, const std::optional& tlsTrustCertsFilePath) { - // TODO: - // 1. Reset the `lookupServicePtr_` with the new serviceUrl and auth parameters, and close the old one. - // 2. Close all connections in `pool_` + LookupServicePtr oldLookupServicePtr; + std::unordered_map oldRedirectedLookupServicePtrs; + + { + Lock lock(mutex_); + if (state_ != Open) { + LOG_ERROR("Client is not open, cannot update connection info"); + return; + } + + if (authentication.has_value()) { + clientConfiguration_.setAuth(*authentication); + } + if (tlsTrustCertsFilePath.has_value()) { + clientConfiguration_.setTlsTrustCertsFilePath(*tlsTrustCertsFilePath); + } + clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl))); + + oldLookupServicePtr = std::move(lookupServicePtr_); + oldRedirectedLookupServicePtrs = std::move(redirectedClusterLookupServicePtrs_); + + lookupServicePtr_ = createLookup(serviceUrl); + redirectedClusterLookupServicePtrs_.clear(); + } + + if (oldLookupServicePtr) { + oldLookupServicePtr->close(); + } + for (const auto& it : oldRedirectedLookupServicePtrs) { + it.second->close(); + } + + pool_.resetConnections(clientConfiguration_.getAuthPtr(), clientConfiguration_); } } /* namespace pulsar */ diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index df050b0e..d94d8fe8 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -67,6 +67,21 @@ bool ConnectionPool::close() { return true; } +void ConnectionPool::resetConnections(const AuthenticationPtr& authentication, + const ClientConfiguration& conf) { + std::unique_lock lock(mutex_); + authentication_ = authentication; + clientConfiguration_ = conf; + + for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { + auto& cnx = cnxIt->second; + if (cnx) { + cnx->close(ResultDisconnected, false); + } + } + pool_.clear(); +} + static const std::string getKey(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix) { std::stringstream ss; diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h index 0e3a6d0a..64907b4f 100644 --- a/lib/ConnectionPool.h +++ b/lib/ConnectionPool.h @@ -51,6 +51,12 @@ class PULSAR_PUBLIC ConnectionPool { */ bool close(); + /** + * Close all existing connections and update the authentication and configuration. + * Unlike close(), the pool remains open for new connections. + */ + void resetConnections(const AuthenticationPtr& authentication, const ClientConfiguration& conf); + void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix, ClientConnection* value); diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index dd892686..f9f4b93b 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -506,3 +506,50 @@ TEST(ClientTest, testNoRetry) { ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms"; } } + +TEST(ClientTest, testUpdateConnectionInfo) { + const std::string cluster1Url = "pulsar://localhost:6650"; + const std::string cluster2Url = "pulsar://localhost:6652"; + const std::string topic1 = "testUpdateConnectionInfo-cluster1-" + std::to_string(time(nullptr)); + const std::string topic2 = "testUpdateConnectionInfo-cluster2-" + std::to_string(time(nullptr)); + + Client client(cluster1Url); + + // Produce and consume on cluster 1 + Producer producer1; + ASSERT_EQ(ResultOk, client.createProducer(topic1, producer1)); + MessageId msgId; + ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("msg-on-cluster1").build(), msgId)); + producer1.close(); + + // Verify there are connections in the pool + auto connections = PulsarFriend::getConnections(client); + ASSERT_FALSE(connections.empty()); + + // Switch to cluster 2 + client.updateConnectionInfo(cluster2Url, std::nullopt, std::nullopt); + + // Previous connections should have been closed + for (const auto &cnx : connections) { + ASSERT_TRUE(cnx->isClosed()); + } + + // Produce and consume on cluster 2 using the same client + Producer producer2; + ASSERT_EQ(ResultOk, client.createProducer(topic2, producer2)); + ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("msg-on-cluster2").build(), msgId)); + + Consumer consumer2; + ASSERT_EQ(ResultOk, client.subscribe(topic2, "sub", consumer2)); + Message msg; + ASSERT_EQ(ResultOk, consumer2.receive(msg, 5000)); + ASSERT_EQ("msg-on-cluster2", msg.getDataAsString()); + + // Verify connection pool now has connections to cluster 2 + auto newConnections = PulsarFriend::getConnections(client); + ASSERT_FALSE(newConnections.empty()); + + consumer2.close(); + producer2.close(); + client.close(); +} From 15908f99dedd18e2f8393b15040578a790f00c2f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 15:08:38 +0800 Subject: [PATCH 03/12] fix auth not cleared if it's not set and improve tests --- lib/ClientImpl.cc | 4 ++ tests/AuthTokenTest.cc | 2 +- tests/ClientTest.cc | 97 +++++++++++++++++++++++++----------------- 3 files changed, 63 insertions(+), 40 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 9be08415..98fe3c30 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -871,9 +871,13 @@ void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, if (authentication.has_value()) { clientConfiguration_.setAuth(*authentication); + } else { + clientConfiguration_.setAuth(AuthFactory::Disabled()); } if (tlsTrustCertsFilePath.has_value()) { clientConfiguration_.setTlsTrustCertsFilePath(*tlsTrustCertsFilePath); + } else { + clientConfiguration_.setTlsTrustCertsFilePath(""); } clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl))); diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc index 84e8572d..4bfd8085 100644 --- a/tests/AuthTokenTest.cc +++ b/tests/AuthTokenTest.cc @@ -42,7 +42,7 @@ static const std::string serviceUrlHttp = "http://localhost:8080"; static const std::string tokenPath = TOKEN_PATH; -static std::string getToken() { +std::string getToken() { std::ifstream file(tokenPath); std::string str((std::istreambuf_iterator(file)), std::istreambuf_iterator()); return str; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index f9f4b93b..37376cf2 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -17,13 +17,16 @@ * under the License. */ #include +#include #include +#include #include #include #include #include -#include +#include +#include #include "MockClientImpl.h" #include "PulsarAdminHelper.h" @@ -32,7 +35,6 @@ #include "lib/ClientConnection.h" #include "lib/LogUtils.h" #include "lib/checksum/ChecksumProvider.h" -#include "lib/stats/ProducerStatsImpl.h" DECLARE_LOG_OBJECT() @@ -508,48 +510,65 @@ TEST(ClientTest, testNoRetry) { } TEST(ClientTest, testUpdateConnectionInfo) { - const std::string cluster1Url = "pulsar://localhost:6650"; - const std::string cluster2Url = "pulsar://localhost:6652"; - const std::string topic1 = "testUpdateConnectionInfo-cluster1-" + std::to_string(time(nullptr)); - const std::string topic2 = "testUpdateConnectionInfo-cluster2-" + std::to_string(time(nullptr)); - - Client client(cluster1Url); - - // Produce and consume on cluster 1 - Producer producer1; - ASSERT_EQ(ResultOk, client.createProducer(topic1, producer1)); + extern std::string getToken(); // from AuthToken.cc + + // Access "private/auth" namespace in cluster 1 + auto info1 = + std::make_tuple("pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt); + // Access "private/auth" namespace in cluster 2 + auto info2 = + std::make_tuple("pulsar+ssl://localhost:6653", + AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), + TEST_CONF_DIR "/hn-verification/cacert.pem"); + // Access "public/default" namespace in cluster 1, which doesn't require authentication + auto info3 = std::make_tuple("pulsar://localhost:6650", std::nullopt, std::nullopt); + + Client client{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; + const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer)); MessageId msgId; - ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("msg-on-cluster1").build(), msgId)); - producer1.close(); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-0").build(), msgId)); - // Verify there are connections in the pool - auto connections = PulsarFriend::getConnections(client); - ASSERT_FALSE(connections.empty()); + // Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh) + ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); + client.updateConnectionInfo(std::get<0>(info2), std::get<1>(info2), std::get<2>(info2)); + ASSERT_TRUE(PulsarFriend::getConnections(client).empty()); - // Switch to cluster 2 - client.updateConnectionInfo(cluster2Url, std::nullopt, std::nullopt); + // Now the same will access the same topic in cluster 2 + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-1").build(), msgId)); + ASSERT_EQ(ResultOk, producer.close()); - // Previous connections should have been closed - for (const auto &cnx : connections) { - ASSERT_TRUE(cnx->isClosed()); - } + // Switch back to cluster 1 without any authentication, the previous authentication info configured for + // cluster 2 will be cleared. + client.updateConnectionInfo(std::get<0>(info3), std::get<1>(info3), std::get<2>(info3)); - // Produce and consume on cluster 2 using the same client - Producer producer2; - ASSERT_EQ(ResultOk, client.createProducer(topic2, producer2)); - ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("msg-on-cluster2").build(), msgId)); + const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build(), msgId)); - Consumer consumer2; - ASSERT_EQ(ResultOk, client.subscribe(topic2, "sub", consumer2)); - Message msg; - ASSERT_EQ(ResultOk, consumer2.receive(msg, 5000)); - ASSERT_EQ("msg-on-cluster2", msg.getDataAsString()); - - // Verify connection pool now has connections to cluster 2 - auto newConnections = PulsarFriend::getConnections(client); - ASSERT_FALSE(newConnections.empty()); - - consumer2.close(); - producer2.close(); client.close(); + + // Verify messages sent to cluster 1 and cluster 2 can be consumed successfully with correct + // authentication info. + auto verify = [](Client &client, const std::string &topic, const std::string &value) { + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader)); + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); + ASSERT_EQ(value, msg.getDataAsString()); + }; + Client client1{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; + verify(client1, topicRequiredAuth, "msg-0"); + client1.close(); + + Client client2{ + std::get<0>(info2), + ClientConfiguration().setAuth(std::get<1>(info2)).setTlsTrustCertsFilePath(std::get<2>(info2))}; + verify(client2, topicRequiredAuth, "msg-1"); + client2.close(); + + Client client3{std::get<0>(info3)}; + verify(client3, topicNoAuth, "msg-2"); + client3.close(); } From 8b0899f189df1314b86049f4d339d3b98565d135 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 16:14:55 +0800 Subject: [PATCH 04/12] refactor and add a getServiceInfo method --- include/pulsar/Client.h | 27 ++++++++++++++++++--------- lib/Client.cc | 8 +++----- lib/ClientImpl.cc | 38 +++++++++++++++++++++++++++++--------- lib/ClientImpl.h | 6 +++--- tests/ClientTest.cc | 36 +++++++++++++++++++++--------------- 5 files changed, 74 insertions(+), 41 deletions(-) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 95233fe9..8fccbe19 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -43,6 +43,12 @@ typedef std::function TableViewCallback; typedef std::function&)> GetPartitionsCallback; typedef std::function CloseCallback; +struct PULSAR_PUBLIC ServiceInfo { + std::string serviceUrl; + std::optional authentication; + std::optional tlsTrustCertsFilePath; +}; + class ClientImpl; class PulsarFriend; class PulsarWrapper; @@ -416,18 +422,21 @@ class PULSAR_PUBLIC Client { std::function callback); /** - * Update the connection information of the client, including service URL, authentication and TLS trust - * certs file path. + * Update the service information of the client. + * * This method is used to switch the connection to a different Pulsar cluster. All connections will be - * closed and the internal connection info will be updated. + * closed and the internal service info will be updated. * - * @param serviceUrl the Pulsar endpoint to use (eg: pulsar://localhost:6650) - * @param authentication the authentication information to use for connecting to the Pulsar cluster - * @param tlsTrustCertsFilePath the TLS trust certs file path to use for connecting to the Pulsar cluster + * @param serviceInfo the service URL, authentication and TLS trust certs file path to use + */ + void updateServiceInfo(const ServiceInfo& serviceInfo); + + /** + * Get the current service information of the client. + * + * @return the current service information */ - void updateConnectionInfo(const std::string& serviceUrl, - const std::optional& authentication, - const std::optional& tlsTrustCertsFilePath); + ServiceInfo getServiceInfo(); private: Client(const std::shared_ptr&); diff --git a/lib/Client.cc b/lib/Client.cc index 60fb62f5..6ceaa117 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -198,10 +198,8 @@ void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, .addListener(std::move(callback)); } -void Client::updateConnectionInfo(const std::string& serviceUrl, - const std::optional& authentication, - const std::optional& tlsTrustCertsFilePath) { - impl_->updateConnectionInfo(serviceUrl, authentication, tlsTrustCertsFilePath); -} +void Client::updateServiceInfo(const ServiceInfo& serviceInfo) { impl_->updateServiceInfo(serviceInfo); } + +ServiceInfo Client::getServiceInfo() { return impl_->getServiceInfo(); } } // namespace pulsar diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 98fe3c30..2aceafea 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -78,6 +78,15 @@ typedef std::unique_lock Lock; typedef std::vector StringList; +namespace { +std::optional toOptionalAuthentication(const AuthenticationPtr& authentication) { + if (!authentication || authentication->getAuthMethodName() == "none") { + return std::nullopt; + } + return authentication; +} +} // namespace + static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, ConnectionPool& pool, const AuthenticationPtr& auth) { @@ -101,6 +110,10 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& state_(Open), clientConfiguration_(ClientConfiguration(clientConfiguration) .setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))), + serviceInfo_{serviceUrl, toOptionalAuthentication(clientConfiguration.getAuthPtr()), + clientConfiguration.getTlsTrustCertsFilePath().empty() + ? std::nullopt + : std::make_optional(clientConfiguration.getTlsTrustCertsFilePath())}, memoryLimitController_(clientConfiguration.getMemoryLimit()), ioExecutorProvider_(std::make_shared(clientConfiguration_.getIOThreads())), listenerExecutorProvider_( @@ -856,9 +869,7 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati return clientConfiguration.impl_->operationTimeout; } -void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, - const std::optional& authentication, - const std::optional& tlsTrustCertsFilePath) { +void ClientImpl::updateServiceInfo(const ServiceInfo& serviceInfo) { LookupServicePtr oldLookupServicePtr; std::unordered_map oldRedirectedLookupServicePtrs; @@ -869,22 +880,26 @@ void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, return; } - if (authentication.has_value()) { - clientConfiguration_.setAuth(*authentication); + if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) { + clientConfiguration_.setAuth(*serviceInfo.authentication); } else { clientConfiguration_.setAuth(AuthFactory::Disabled()); } - if (tlsTrustCertsFilePath.has_value()) { - clientConfiguration_.setTlsTrustCertsFilePath(*tlsTrustCertsFilePath); + if (serviceInfo.tlsTrustCertsFilePath.has_value()) { + clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath); } else { clientConfiguration_.setTlsTrustCertsFilePath(""); } - clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl))); + clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl))); + serviceInfo_ = {serviceInfo.serviceUrl, toOptionalAuthentication(clientConfiguration_.getAuthPtr()), + clientConfiguration_.getTlsTrustCertsFilePath().empty() + ? std::nullopt + : std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())}; oldLookupServicePtr = std::move(lookupServicePtr_); oldRedirectedLookupServicePtrs = std::move(redirectedClusterLookupServicePtrs_); - lookupServicePtr_ = createLookup(serviceUrl); + lookupServicePtr_ = createLookup(serviceInfo.serviceUrl); redirectedClusterLookupServicePtrs_.clear(); } @@ -898,4 +913,9 @@ void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, pool_.resetConnections(clientConfiguration_.getAuthPtr(), clientConfiguration_); } +ServiceInfo ClientImpl::getServiceInfo() { + Lock lock(mutex_); + return serviceInfo_; +} + } /* namespace pulsar */ diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index e89aa98a..c96f9aab 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -139,9 +139,8 @@ class ClientImpl : public std::enable_shared_from_this { ConnectionPool& getConnectionPool() noexcept { return pool_; } uint64_t getLookupCount() { return lookupCount_; } - void updateConnectionInfo(const std::string& serviceUrl, - const std::optional& authentication, - const std::optional& tlsTrustCertsFilePath); + void updateServiceInfo(const ServiceInfo& serviceInfo); + ServiceInfo getServiceInfo(); static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); @@ -196,6 +195,7 @@ class ClientImpl : public std::enable_shared_from_this { State state_; ClientConfiguration clientConfiguration_; + ServiceInfo serviceInfo_; MemoryLimitController memoryLimitController_; ExecutorServiceProviderPtr ioExecutorProvider_; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 37376cf2..5bc4d89b 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -513,17 +513,16 @@ TEST(ClientTest, testUpdateConnectionInfo) { extern std::string getToken(); // from AuthToken.cc // Access "private/auth" namespace in cluster 1 - auto info1 = - std::make_tuple("pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt); + ServiceInfo info1{"pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt}; // Access "private/auth" namespace in cluster 2 - auto info2 = - std::make_tuple("pulsar+ssl://localhost:6653", - AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), - TEST_CONF_DIR "/hn-verification/cacert.pem"); + ServiceInfo info2{ + "pulsar+ssl://localhost:6653", + AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), + TEST_CONF_DIR "/hn-verification/cacert.pem"}; // Access "public/default" namespace in cluster 1, which doesn't require authentication - auto info3 = std::make_tuple("pulsar://localhost:6650", std::nullopt, std::nullopt); + ServiceInfo info3{"pulsar://localhost:6650", std::nullopt, std::nullopt}; - Client client{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; + Client client{info1.serviceUrl, ClientConfiguration().setAuth(*info1.authentication)}; const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr)); Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer)); @@ -532,8 +531,11 @@ TEST(ClientTest, testUpdateConnectionInfo) { // Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh) ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); - client.updateConnectionInfo(std::get<0>(info2), std::get<1>(info2), std::get<2>(info2)); + client.updateServiceInfo(info2); ASSERT_TRUE(PulsarFriend::getConnections(client).empty()); + ASSERT_EQ(info2.serviceUrl, client.getServiceInfo().serviceUrl); + ASSERT_EQ(info2.authentication, client.getServiceInfo().authentication); + ASSERT_EQ(info2.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); // Now the same will access the same topic in cluster 2 ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-1").build(), msgId)); @@ -541,7 +543,10 @@ TEST(ClientTest, testUpdateConnectionInfo) { // Switch back to cluster 1 without any authentication, the previous authentication info configured for // cluster 2 will be cleared. - client.updateConnectionInfo(std::get<0>(info3), std::get<1>(info3), std::get<2>(info3)); + client.updateServiceInfo(info3); + ASSERT_EQ(info3.serviceUrl, client.getServiceInfo().serviceUrl); + ASSERT_EQ(info3.authentication, client.getServiceInfo().authentication); + ASSERT_EQ(info3.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr)); ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer)); @@ -558,17 +563,18 @@ TEST(ClientTest, testUpdateConnectionInfo) { ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); ASSERT_EQ(value, msg.getDataAsString()); }; - Client client1{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; + Client client1{info1.serviceUrl, ClientConfiguration().setAuth(*info1.authentication)}; verify(client1, topicRequiredAuth, "msg-0"); client1.close(); - Client client2{ - std::get<0>(info2), - ClientConfiguration().setAuth(std::get<1>(info2)).setTlsTrustCertsFilePath(std::get<2>(info2))}; + Client client2{info2.serviceUrl, + ClientConfiguration() + .setAuth(*info2.authentication) + .setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath)}; verify(client2, topicRequiredAuth, "msg-1"); client2.close(); - Client client3{std::get<0>(info3)}; + Client client3{info3.serviceUrl}; verify(client3, topicNoAuth, "msg-2"); client3.close(); } From 7ca81369a445c3380145251980c815bdc84ef0a9 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 16:19:59 +0800 Subject: [PATCH 05/12] fix format --- tests/ClientTest.cc | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 5bc4d89b..07128d19 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -515,10 +515,9 @@ TEST(ClientTest, testUpdateConnectionInfo) { // Access "private/auth" namespace in cluster 1 ServiceInfo info1{"pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt}; // Access "private/auth" namespace in cluster 2 - ServiceInfo info2{ - "pulsar+ssl://localhost:6653", - AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), - TEST_CONF_DIR "/hn-verification/cacert.pem"}; + ServiceInfo info2{"pulsar+ssl://localhost:6653", + AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), + TEST_CONF_DIR "/hn-verification/cacert.pem"}; // Access "public/default" namespace in cluster 1, which doesn't require authentication ServiceInfo info3{"pulsar://localhost:6650", std::nullopt, std::nullopt}; @@ -567,10 +566,9 @@ TEST(ClientTest, testUpdateConnectionInfo) { verify(client1, topicRequiredAuth, "msg-0"); client1.close(); - Client client2{info2.serviceUrl, - ClientConfiguration() - .setAuth(*info2.authentication) - .setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath)}; + Client client2{info2.serviceUrl, ClientConfiguration() + .setAuth(*info2.authentication) + .setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath)}; verify(client2, topicRequiredAuth, "msg-1"); client2.close(); From e70a0a421f2423b1a79c9e481c0fc1b6329b446e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 17:06:01 +0800 Subject: [PATCH 06/12] use read-write lock instead --- include/pulsar/Client.h | 2 +- lib/Client.cc | 2 +- lib/ClientImpl.cc | 114 ++++++++++++++++++++-------------------- lib/ClientImpl.h | 5 +- tests/ClientTest.cc | 2 +- 5 files changed, 63 insertions(+), 62 deletions(-) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 8fccbe19..f3ee2f66 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -436,7 +436,7 @@ class PULSAR_PUBLIC Client { * * @return the current service information */ - ServiceInfo getServiceInfo(); + ServiceInfo getServiceInfo() const; private: Client(const std::shared_ptr&); diff --git a/lib/Client.cc b/lib/Client.cc index 6ceaa117..45cf2170 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -200,6 +200,6 @@ void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, void Client::updateServiceInfo(const ServiceInfo& serviceInfo) { impl_->updateServiceInfo(serviceInfo); } -ServiceInfo Client::getServiceInfo() { return impl_->getServiceInfo(); } +ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); } } // namespace pulsar diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 2aceafea..f64908a5 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -24,7 +24,9 @@ #include #include #include +#include #include +#include #include #include "BinaryProtoLookupService.h" @@ -74,8 +76,6 @@ std::string generateRandomName() { return randomName; } -typedef std::unique_lock Lock; - typedef std::vector StringList; namespace { @@ -157,19 +157,26 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { } LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) { - Lock lock(mutex_); + std::shared_lock readLock(mutex_); if (redirectedClusterURI.empty()) { return lookupServicePtr_; } - auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); - if (it == redirectedClusterLookupServicePtrs_.end()) { - auto lookup = createLookup(redirectedClusterURI); - redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup); - return lookup; + if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); + it != redirectedClusterLookupServicePtrs_.end()) { + return it->second; } + readLock.unlock(); - return it->second; + std::unique_lock writeLock(mutex_); + // Double check in case another thread acquires the lock and inserts a pair first + if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); + it != redirectedClusterLookupServicePtrs_.end()) { + return it->second; + } + auto lookup = createLookup(redirectedClusterURI); + redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup); + return lookup; } void ClientImpl::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, @@ -179,7 +186,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Producer()); @@ -267,7 +274,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st const ReaderConfiguration& conf, const ReaderCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Reader()); @@ -289,7 +296,7 @@ void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewC const TableViewCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, TableView()); @@ -355,7 +362,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const const SubscribeCallback& callback) { TopicNamePtr topicNamePtr = TopicName::get(regexPattern); - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -441,7 +448,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto it = std::unique(topics.begin(), topics.end()); auto newSize = std::distance(topics.begin(), it); topics.resize(newSize); - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -477,7 +484,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub const ConsumerConfiguration& conf, const SubscribeCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -662,7 +669,7 @@ void ClientImpl::handleGetPartitions(Result result, const LookupDataResultPtr& p void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, StringList()); @@ -679,7 +686,9 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP } void ClientImpl::closeAsync(const CloseCallback& callback) { + std::unique_lock lock(mutex_); if (state_ != Open) { + lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -689,10 +698,12 @@ void ClientImpl::closeAsync(const CloseCallback& callback) { state_ = Closing; memoryLimitController_.close(); - getLookup()->close(); + lookupServicePtr_->close(); for (const auto& it : redirectedClusterLookupServicePtrs_) { it.second->close(); } + redirectedClusterLookupServicePtrs_.clear(); + lock.unlock(); auto producers = producers_.move(); auto consumers = consumers_.move(); @@ -741,7 +752,7 @@ void ClientImpl::handleClose(Result result, const SharedInt& numberOfOpenHandler --(*numberOfOpenHandlers); } if (*numberOfOpenHandlers == 0) { - Lock lock(mutex_); + std::unique_lock lock(mutex_); if (state_ == Closed) { LOG_DEBUG("Client is already shutting down, possible race condition in handleClose"); return; @@ -821,12 +832,12 @@ void ClientImpl::shutdown() { } uint64_t ClientImpl::newProducerId() { - Lock lock(mutex_); + std::shared_lock lock(mutex_); return producerIdGenerator_++; } uint64_t ClientImpl::newConsumerId() { - Lock lock(mutex_); + std::shared_lock lock(mutex_); return consumerIdGenerator_++; } @@ -870,51 +881,40 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati } void ClientImpl::updateServiceInfo(const ServiceInfo& serviceInfo) { - LookupServicePtr oldLookupServicePtr; - std::unordered_map oldRedirectedLookupServicePtrs; - - { - Lock lock(mutex_); - if (state_ != Open) { - LOG_ERROR("Client is not open, cannot update connection info"); - return; - } - - if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) { - clientConfiguration_.setAuth(*serviceInfo.authentication); - } else { - clientConfiguration_.setAuth(AuthFactory::Disabled()); - } - if (serviceInfo.tlsTrustCertsFilePath.has_value()) { - clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath); - } else { - clientConfiguration_.setTlsTrustCertsFilePath(""); - } - clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl))); - serviceInfo_ = {serviceInfo.serviceUrl, toOptionalAuthentication(clientConfiguration_.getAuthPtr()), - clientConfiguration_.getTlsTrustCertsFilePath().empty() - ? std::nullopt - : std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())}; - - oldLookupServicePtr = std::move(lookupServicePtr_); - oldRedirectedLookupServicePtrs = std::move(redirectedClusterLookupServicePtrs_); - - lookupServicePtr_ = createLookup(serviceInfo.serviceUrl); - redirectedClusterLookupServicePtrs_.clear(); + std::unique_lock lock(mutex_); + if (state_ != Open) { + LOG_ERROR("Client is not open, cannot update connection info"); + return; } - if (oldLookupServicePtr) { - oldLookupServicePtr->close(); + if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) { + clientConfiguration_.setAuth(*serviceInfo.authentication); + } else { + clientConfiguration_.setAuth(AuthFactory::Disabled()); } - for (const auto& it : oldRedirectedLookupServicePtrs) { - it.second->close(); + if (serviceInfo.tlsTrustCertsFilePath.has_value()) { + clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath); + } else { + clientConfiguration_.setTlsTrustCertsFilePath(""); } + clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl))); + serviceInfo_ = {serviceInfo.serviceUrl, toOptionalAuthentication(clientConfiguration_.getAuthPtr()), + clientConfiguration_.getTlsTrustCertsFilePath().empty() + ? std::nullopt + : std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())}; pool_.resetConnections(clientConfiguration_.getAuthPtr(), clientConfiguration_); + + lookupServicePtr_->close(); + for (auto&& it : redirectedClusterLookupServicePtrs_) { + it.second->close(); + } + redirectedClusterLookupServicePtrs_.clear(); + lookupServicePtr_ = createLookup(serviceInfo.serviceUrl); } -ServiceInfo ClientImpl::getServiceInfo() { - Lock lock(mutex_); +ServiceInfo ClientImpl::getServiceInfo() const { + std::shared_lock lock(mutex_); return serviceInfo_; } diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index c96f9aab..26c167f1 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "ConnectionPool.h" #include "Future.h" @@ -140,7 +141,7 @@ class ClientImpl : public std::enable_shared_from_this { uint64_t getLookupCount() { return lookupCount_; } void updateServiceInfo(const ServiceInfo& serviceInfo); - ServiceInfo getServiceInfo(); + ServiceInfo getServiceInfo() const; static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); @@ -191,7 +192,7 @@ class ClientImpl : public std::enable_shared_from_this { Closed }; - std::mutex mutex_; + mutable std::shared_mutex mutex_; State state_; ClientConfiguration clientConfiguration_; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 07128d19..31d0d93d 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -509,7 +509,7 @@ TEST(ClientTest, testNoRetry) { } } -TEST(ClientTest, testUpdateConnectionInfo) { +TEST(ClientTest, testUpdateServiceInfo) { extern std::string getToken(); // from AuthToken.cc // Access "private/auth" namespace in cluster 1 From 342505e7dfaf462b0960a2308b93c1466285e814 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 17:19:04 +0800 Subject: [PATCH 07/12] pass by value + std::move for better performance --- include/pulsar/Client.h | 2 +- lib/Client.cc | 2 +- lib/ClientImpl.cc | 2 +- lib/ClientImpl.h | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index f3ee2f66..15dd0324 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -429,7 +429,7 @@ class PULSAR_PUBLIC Client { * * @param serviceInfo the service URL, authentication and TLS trust certs file path to use */ - void updateServiceInfo(const ServiceInfo& serviceInfo); + void updateServiceInfo(ServiceInfo serviceInfo); /** * Get the current service information of the client. diff --git a/lib/Client.cc b/lib/Client.cc index 45cf2170..311d2ddd 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -198,7 +198,7 @@ void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, .addListener(std::move(callback)); } -void Client::updateServiceInfo(const ServiceInfo& serviceInfo) { impl_->updateServiceInfo(serviceInfo); } +void Client::updateServiceInfo(ServiceInfo serviceInfo) { impl_->updateServiceInfo(std::move(serviceInfo)); } ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); } diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index f64908a5..7f2ee6c7 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -880,7 +880,7 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati return clientConfiguration.impl_->operationTimeout; } -void ClientImpl::updateServiceInfo(const ServiceInfo& serviceInfo) { +void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) { std::unique_lock lock(mutex_); if (state_ != Open) { LOG_ERROR("Client is not open, cannot update connection info"); diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 26c167f1..e58d6df6 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -140,7 +140,7 @@ class ClientImpl : public std::enable_shared_from_this { ConnectionPool& getConnectionPool() noexcept { return pool_; } uint64_t getLookupCount() { return lookupCount_; } - void updateServiceInfo(const ServiceInfo& serviceInfo); + void updateServiceInfo(ServiceInfo&& serviceInfo); ServiceInfo getServiceInfo() const; static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); From 477fb9495e3b496f56ea94ca2e4497bb2d01ec75 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 17:21:08 +0800 Subject: [PATCH 08/12] fix thread safety due to read lock for write operation --- lib/ClientImpl.cc | 10 ++-------- lib/ClientImpl.h | 4 ++-- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 7f2ee6c7..5b71a696 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -831,15 +831,9 @@ void ClientImpl::shutdown() { lookupCount_ = 0; } -uint64_t ClientImpl::newProducerId() { - std::shared_lock lock(mutex_); - return producerIdGenerator_++; -} +uint64_t ClientImpl::newProducerId() { return producerIdGenerator_++; } -uint64_t ClientImpl::newConsumerId() { - std::shared_lock lock(mutex_); - return consumerIdGenerator_++; -} +uint64_t ClientImpl::newConsumerId() { return consumerIdGenerator_++; } uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; } diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index e58d6df6..11881a2f 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -207,8 +207,8 @@ class ClientImpl : public std::enable_shared_from_this { std::unordered_map redirectedClusterLookupServicePtrs_; ConnectionPool pool_; - uint64_t producerIdGenerator_; - uint64_t consumerIdGenerator_; + std::atomic_uint64_t producerIdGenerator_; + std::atomic_uint64_t consumerIdGenerator_; std::shared_ptr> requestIdGenerator_{std::make_shared>(0)}; SynchronizedHashMap producers_; From 9114edf05e322f1fcc02dc6b6ff42ee1ce613782 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 17:43:58 +0800 Subject: [PATCH 09/12] fix stale lookup service might be refered --- lib/Client.cc | 3 +-- lib/ClientImpl.cc | 37 +++++++++++++-------------- lib/ClientImpl.h | 22 +++++++++++++--- lib/MultiTopicsConsumerImpl.cc | 22 ++++++++++------ lib/MultiTopicsConsumerImpl.h | 7 +---- lib/PartitionedProducerImpl.cc | 8 +++--- lib/PartitionedProducerImpl.h | 3 --- lib/PatternMultiTopicsConsumerImpl.cc | 13 +++++----- lib/PatternMultiTopicsConsumerImpl.h | 1 - lib/ReaderImpl.cc | 1 - tests/LookupServiceTest.cc | 17 +++--------- 11 files changed, 69 insertions(+), 65 deletions(-) diff --git a/lib/Client.cc b/lib/Client.cc index 311d2ddd..ba829b2b 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -193,8 +193,7 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, std::function callback) { - impl_->getLookup() - ->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") + impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") .addListener(std::move(callback)); } diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 5b71a696..02f017b0 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -200,8 +200,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon if (autoDownloadSchema) { auto self = shared_from_this(); - auto lookup = getLookup(); - lookup->getSchema(topicName).addListener( + getSchema(topicName).addListener( [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { if (res != ResultOk) { callback(res, Producer()); @@ -209,12 +208,12 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } ProducerConfiguration conf; conf.setSchema(topicSchema); - self->getLookup()->getPartitionMetadataAsync(topicName).addListener( + self->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); }); } else { - getLookup()->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); } @@ -287,7 +286,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st } MessageId msgId(startMessageId); - getLookup()->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, msgId, conf, callback)); } @@ -400,8 +399,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const return; } - getLookup() - ->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) + getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, regexPattern, mode, subscriptionName, conf, callback)); @@ -423,9 +421,8 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace auto interceptors = std::make_shared(conf.getInterceptors()); - consumer = std::make_shared(shared_from_this(), regexPattern, mode, - *matchTopics, subscriptionName, conf, - getLookup(), interceptors); + consumer = std::make_shared( + shared_from_this(), regexPattern, mode, *matchTopics, subscriptionName, conf, interceptors); consumer->getConsumerCreatedFuture().addListener( std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -472,7 +469,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto interceptors = std::make_shared(conf.getInterceptors()); ConsumerImplBasePtr consumer = std::make_shared( - shared_from_this(), topics, subscriptionName, topicNamePtr, conf, getLookup(), interceptors); + shared_from_this(), topics, subscriptionName, topicNamePtr, conf, interceptors); consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -502,7 +499,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub } } - getLookup()->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, subscriptionName, conf, callback)); } @@ -525,9 +522,9 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti callback(ResultInvalidConfiguration, Consumer()); return; } - consumer = std::make_shared( - shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf, - getLookup(), interceptors); + consumer = std::make_shared(shared_from_this(), topicName, + partitionMetadata->getPartitions(), + subscriptionName, conf, interceptors); } else { auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), subscriptionName, conf, @@ -680,9 +677,9 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP return; } } - getLookup()->getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, callback)); + getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions, + shared_from_this(), std::placeholders::_1, + std::placeholders::_2, topicName, callback)); } void ClientImpl::closeAsync(const CloseCallback& callback) { @@ -802,7 +799,9 @@ void ClientImpl::shutdown() { << " consumers have been shutdown."); } - getLookup()->close(); + std::shared_lock lock(mutex_); + lookupServicePtr_->close(); + lock.unlock(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 11881a2f..2a6a8298 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -29,6 +29,7 @@ #include "ConnectionPool.h" #include "Future.h" #include "LookupDataResult.h" +#include "LookupService.h" #include "MemoryLimitController.h" #include "ProtoApiEnums.h" #include "SynchronizedHashMap.h" @@ -53,8 +54,6 @@ typedef std::weak_ptr ConsumerImplBaseWeakPtr; class ClientConnection; using ClientConnectionPtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; using LookupServiceFactory = std::function; @@ -129,7 +128,6 @@ class ClientImpl : public std::enable_shared_from_this { ExecutorServiceProviderPtr getIOExecutorProvider(); ExecutorServiceProviderPtr getListenerExecutorProvider(); ExecutorServiceProviderPtr getPartitionListenerExecutorProvider(); - LookupServicePtr getLookup(const std::string& redirectedClusterURI = ""); void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); } @@ -143,6 +141,23 @@ class ClientImpl : public std::enable_shared_from_this { void updateServiceInfo(ServiceInfo&& serviceInfo); ServiceInfo getServiceInfo() const; + // Since the underlying `lookupServicePtr_` can be modified by `updateServiceInfo`, we should not expose + // it to other classes, otherwise the update might not be visible. + auto getPartitionMetadataAsync(const TopicNamePtr& topicName) { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getPartitionMetadataAsync(topicName); + } + + auto getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode); + } + + auto getSchema(const TopicNamePtr& topicName, const std::string& version = "") { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getSchema(topicName, version); + } + static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); friend class PulsarFriend; @@ -182,6 +197,7 @@ class ClientImpl : public std::enable_shared_from_this { const std::string& logicalAddress); LookupServicePtr createLookup(const std::string& serviceUrl); + LookupServicePtr getLookup(const std::string& redirectedClusterURI); static std::string getClientVersion(const ClientConfiguration& clientConfiguration); diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 0799eb63..a5699781 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -44,20 +44,19 @@ using std::chrono::seconds; MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode, const optional& startMessageId) : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf, - lookupServicePtr, interceptors, subscriptionMode, startMessageId) { + interceptors, subscriptionMode, startMessageId) { topicsPartitions_[topicName->toString()] = numPartitions; } MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, - Commands::SubscriptionMode subscriptionMode, const optional& startMessageId) + const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode, + const optional& startMessageId) : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics", Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf, client->getListenerExecutorProvider()->get()), @@ -66,7 +65,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( conf_(conf), incomingMessages_(conf.getReceiverQueueSize()), messageListener_(conf.getMessageListener()), - lookupServicePtr_(lookupServicePtr), numberTopicPartitions_(std::make_shared>(0)), topics_(topics), subscriptionMode_(subscriptionMode), @@ -93,7 +91,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( if (partitionsUpdateInterval > 0) { partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); partitionsUpdateInterval_ = seconds(partitionsUpdateInterval); - lookupServicePtr_ = client->getLookup(); } state_ = Pending; @@ -185,7 +182,12 @@ Future MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s auto entry = topicsPartitions_.find(topic); if (entry == topicsPartitions_.end()) { lock.unlock(); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + auto client = client_.lock(); + if (!client) { + topicPromise->setFailed(ResultAlreadyClosed); + return topicPromise->getFuture(); + } + client->getPartitionMetadataAsync(topicName).addListener( [this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) { if (result != ResultOk) { LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- " @@ -1003,7 +1005,11 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() { auto topicName = TopicName::get(item.first); auto currentNumPartitions = item.second; auto weakSelf = weak_from_this(); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + auto client = client_.lock(); + if (!client) { + return; + } + client->getPartitionMetadataAsync(topicName).addListener( [this, weakSelf, topicName, currentNumPartitions](Result result, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index dc628652..38a44cdf 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -46,23 +46,19 @@ class MultiTopicsBrokerConsumerStatsImpl; using MultiTopicsBrokerConsumerStatsPtr = std::shared_ptr; class UnAckedMessageTrackerInterface; using UnAckedMessageTrackerPtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; class MultiTopicsConsumerImpl; class MultiTopicsConsumerImpl : public ConsumerImplBase { public: MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, const optional& startMessageId = optional{}); MultiTopicsConsumerImpl(const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, - const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_, - const ConsumerInterceptorsPtr& interceptors, + const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, const optional& startMessageId = optional{}); @@ -119,7 +115,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { MessageListener messageListener_; DeadlineTimerPtr partitionsUpdateTimer_; TimeDuration partitionsUpdateInterval_; - LookupServicePtr lookupServicePtr_; std::shared_ptr> numberTopicPartitions_; std::atomic failedResult{ResultOk}; Promise multiTopicsConsumerCreatedPromise_; diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 4a923666..1aa5c87b 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -23,7 +23,6 @@ #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" -#include "LookupService.h" #include "ProducerImpl.h" #include "RoundRobinMessageRouter.h" #include "SinglePartitionMessageRouter.h" @@ -59,7 +58,6 @@ PartitionedProducerImpl::PartitionedProducerImpl(const ClientImplPtr& client, co listenerExecutor_ = client->getListenerExecutorProvider()->get(); partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); partitionsUpdateInterval_ = std::chrono::seconds(partitionsUpdateInterval); - lookupServicePtr_ = client->getLookup(); } } @@ -433,7 +431,11 @@ void PartitionedProducerImpl::runPartitionUpdateTask() { void PartitionedProducerImpl::getPartitionMetadata() { using namespace std::placeholders; auto weakSelf = weak_from_this(); - lookupServicePtr_->getPartitionMetadataAsync(topicName_) + auto client = client_.lock(); + if (!client) { + return; + } + client->getPartitionMetadataAsync(topicName_) .addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); if (self) { diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 40f2d34d..94ba7179 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -38,8 +38,6 @@ using ClientImplPtr = std::shared_ptr; using ClientImplWeakPtr = std::weak_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; class ProducerImpl; using ProducerImplPtr = std::shared_ptr; class TopicName; @@ -133,7 +131,6 @@ class PartitionedProducerImpl : public ProducerImplBase, ExecutorServicePtr listenerExecutor_; DeadlineTimerPtr partitionsUpdateTimer_; TimeDuration partitionsUpdateInterval_; - LookupServicePtr lookupServicePtr_; ProducerInterceptorsPtr interceptors_; diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index fd48feed..4b5aab73 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -21,7 +21,6 @@ #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" -#include "LookupService.h" DECLARE_LOG_OBJECT() @@ -32,10 +31,8 @@ using std::chrono::seconds; PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl( const ClientImplPtr& client, const std::string& pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, - const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_, - const ConsumerInterceptorsPtr& interceptors) - : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, - lookupServicePtr_, interceptors), + const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors) + : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, interceptors), patternString_(pattern), pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))), getTopicsMode_(getTopicsMode), @@ -84,7 +81,11 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& er // already get namespace from pattern. assert(namespaceName_); - lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_) + auto client = client_.lock(); + if (!client) { + return; + } + client->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_) .addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this, std::placeholders::_1, std::placeholders::_2)); } diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h index 63527965..796abcc2 100644 --- a/lib/PatternMultiTopicsConsumerImpl.h +++ b/lib/PatternMultiTopicsConsumerImpl.h @@ -52,7 +52,6 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr_, const ConsumerInterceptorsPtr& interceptors); ~PatternMultiTopicsConsumerImpl() override; diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index 7fa7e8b9..754137c5 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -90,7 +90,6 @@ void ReaderImpl::start(const MessageId& startMessageId, if (partitions_ > 0) { auto consumerImpl = std::make_shared( client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf, - client_.lock()->getLookup(), std::make_shared(std::vector()), Commands::SubscriptionModeNonDurable, startMessageId); consumer_ = consumerImpl; diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 92aa8204..dc9cfc4e 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -259,7 +259,7 @@ TEST_P(LookupServiceTest, basicGetNamespaceTopics) { ASSERT_EQ(ResultOk, result); // 2. verify getTopicsOfNamespace by regex mode. - auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_)->getLookup(); + auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_); auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode, const std::set& expectedTopics) { Future getTopicsFuture = @@ -292,11 +292,8 @@ TEST_P(LookupServiceTest, testGetSchema) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); ASSERT_EQ(jsonSchema, schemaInfo.getSchema()); ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType()); @@ -310,11 +307,8 @@ TEST_P(LookupServiceTest, testGetSchemaNotFound) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo)); } @@ -335,11 +329,8 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema()); ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType()); From c6de067e3795b58eed558c03795e385fc8ee16a5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 21:02:09 +0800 Subject: [PATCH 10/12] fix the reader cannot consume after switching to a new cluster --- lib/ClientConnection.cc | 5 ++++- lib/ClientConnection.h | 3 ++- lib/ClientImpl.cc | 2 +- lib/ConnectionPool.cc | 6 +++--- lib/ConnectionPool.h | 2 +- lib/ConsumerImpl.cc | 13 ++++++++++++- lib/ConsumerImpl.h | 7 +++++++ tests/ClientTest.cc | 25 ++++++++++++++++++++----- 8 files changed, 50 insertions(+), 13 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 0a850ed0..41d2f608 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1292,7 +1292,7 @@ void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, startConsumerStatsTimer(consumerStatsRequests); } -void ClientConnection::close(Result result, bool detach) { +void ClientConnection::close(Result result, bool detach, bool switchCluster) { Lock lock(mutex_); if (isClosed()) { return; @@ -1368,6 +1368,9 @@ void ClientConnection::close(Result result, bool detach) { for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) { auto consumer = it->second.lock(); if (consumer) { + if (switchCluster) { + consumer->onClusterSwitching(); + } consumer->handleDisconnection(result, self); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index b2770006..39e4224d 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -157,10 +157,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_thisclose(); for (auto&& it : redirectedClusterLookupServicePtrs_) { diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index d94d8fe8..eaa14dc7 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -67,8 +67,8 @@ bool ConnectionPool::close() { return true; } -void ConnectionPool::resetConnections(const AuthenticationPtr& authentication, - const ClientConfiguration& conf) { +void ConnectionPool::resetForClusterSwitching(const AuthenticationPtr& authentication, + const ClientConfiguration& conf) { std::unique_lock lock(mutex_); authentication_ = authentication; clientConfiguration_ = conf; @@ -76,7 +76,7 @@ void ConnectionPool::resetConnections(const AuthenticationPtr& authentication, for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { auto& cnx = cnxIt->second; if (cnx) { - cnx->close(ResultDisconnected, false); + cnx->close(ResultDisconnected, false, true); } } pool_.clear(); diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h index 64907b4f..541cf8ed 100644 --- a/lib/ConnectionPool.h +++ b/lib/ConnectionPool.h @@ -55,7 +55,7 @@ class PULSAR_PUBLIC ConnectionPool { * Close all existing connections and update the authentication and configuration. * Unlike close(), the pool remains open for new connections. */ - void resetConnections(const AuthenticationPtr& authentication, const ClientConfiguration& conf); + void resetForClusterSwitching(const AuthenticationPtr& authentication, const ClientConfiguration& conf); void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix, ClientConnection* value); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 757b6e84..785a8bd7 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -125,7 +125,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic negativeAcksTracker_(std::make_shared(client, *this, conf)), ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)), readCompacted_(conf.isReadCompacted()), - startMessageId_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageIdFromConfig_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageId_(startMessageIdFromConfig_), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), @@ -1134,6 +1135,16 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { } } +void ConsumerImpl::onClusterSwitching() { + { + LockGuard lock{mutex_}; + incomingMessages_.clear(); + startMessageId_ = startMessageIdFromConfig_; + lastDequedMessageId_ = MessageId::earliest(); + } + ackGroupingTrackerPtr_->flushAndClean(); +} + /** * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that * was diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 0da82a2d..6f287aa2 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -162,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase { void doImmediateAck(const MessageId& msgId, const ResultCallback& callback, CommandAck_AckType ackType); void doImmediateAck(const std::set& msgIds, const ResultCallback& callback); + void onClusterSwitching(); + protected: // overrided methods from HandlerBase Future connectionOpened(const ClientConnectionPtr& cnx) override; @@ -266,6 +268,11 @@ class ConsumerImpl : public ConsumerImplBase { MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; + + // When the consumer switches to a new cluster, we should reset `startMessageId_` to the original value, + // otherwise, the message id of the old cluster might be passed in the Subscribe request on the new + // cluster. + const optional startMessageIdFromConfig_; optional startMessageId_; SeekStatus seekStatus_{SeekStatus::NOT_STARTED}; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 31d0d93d..93ec3a11 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -522,11 +522,26 @@ TEST(ClientTest, testUpdateServiceInfo) { ServiceInfo info3{"pulsar://localhost:6650", std::nullopt, std::nullopt}; Client client{info1.serviceUrl, ClientConfiguration().setAuth(*info1.authentication)}; + const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr)); Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer)); - MessageId msgId; - ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-0").build(), msgId)); + + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topicRequiredAuth, MessageId::earliest(), {}, reader)); + + auto sendAndReceive = [&](const std::string &value) { + MessageId msgId; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(value).build(), msgId)); + LOG_INFO("Sent " << value << " to " << msgId); + + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); + LOG_INFO("Read " << msg.getDataAsString() << " from " << msgId); + ASSERT_EQ(value, msg.getDataAsString()); + }; + + sendAndReceive("msg-0"); // Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh) ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); @@ -537,8 +552,7 @@ TEST(ClientTest, testUpdateServiceInfo) { ASSERT_EQ(info2.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); // Now the same will access the same topic in cluster 2 - ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-1").build(), msgId)); - ASSERT_EQ(ResultOk, producer.close()); + sendAndReceive("msg-1"); // Switch back to cluster 1 without any authentication, the previous authentication info configured for // cluster 2 will be cleared. @@ -548,8 +562,9 @@ TEST(ClientTest, testUpdateServiceInfo) { ASSERT_EQ(info3.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + producer.close(); ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer)); - ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build(), msgId)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build())); client.close(); From bf7405162f078c38ca034d8ded93106142e0c36d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 21:41:50 +0800 Subject: [PATCH 11/12] Add the framework for AutoClusterFailover --- include/pulsar/AutoClusterFailover.h | 71 ++++++++++++++++++++++ include/pulsar/Client.h | 8 +-- include/pulsar/ServiceInfo.h | 42 +++++++++++++ lib/AutoClusterFailover.cc | 91 ++++++++++++++++++++++++++++ lib/ClientImpl.h | 1 + 5 files changed, 207 insertions(+), 6 deletions(-) create mode 100644 include/pulsar/AutoClusterFailover.h create mode 100644 include/pulsar/ServiceInfo.h create mode 100644 lib/AutoClusterFailover.cc diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h new file mode 100644 index 00000000..a5594c7b --- /dev/null +++ b/include/pulsar/AutoClusterFailover.h @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_ +#define PULSAR_AUTO_CLUSTER_FAILOVER_H_ + +#include + +#include + +namespace pulsar { + +class Client; +class AutoClusterFailoverImpl; + +class PULSAR_PUBLIC AutoClusterFailover final { + public: + /** + * Leverage `Client`'s internals for service URL probe and cluster switching. + */ + void initialize(Client& client); + + struct Config { + ServiceInfo primary; + std::vector secondary; + std::chrono::milliseconds checkInterval{30000}; // 30 seconds + }; + + class Builder { + public: + Builder(ServiceInfo primary, std::vector secondary) { + config_.primary = std::move(primary); + config_.secondary = std::move(secondary); + } + + Builder& withCheckInterval(std::chrono::milliseconds interval) { + config_.checkInterval = interval; + return *this; + } + + AutoClusterFailover build() { return AutoClusterFailover(std::move(config_)); } + + private: + Config config_; + }; + + explicit AutoClusterFailover(Config&& config); + ~AutoClusterFailover(); + + private: + std::shared_ptr impl_; +}; + +} // namespace pulsar + +#endif diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 15dd0324..892918cd 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -43,12 +44,6 @@ typedef std::function TableViewCallback; typedef std::function&)> GetPartitionsCallback; typedef std::function CloseCallback; -struct PULSAR_PUBLIC ServiceInfo { - std::string serviceUrl; - std::optional authentication; - std::optional tlsTrustCertsFilePath; -}; - class ClientImpl; class PulsarFriend; class PulsarWrapper; @@ -443,6 +438,7 @@ class PULSAR_PUBLIC Client { friend class PulsarFriend; friend class PulsarWrapper; + friend class AutoClusterFailover; std::shared_ptr impl_; }; } // namespace pulsar diff --git a/include/pulsar/ServiceInfo.h b/include/pulsar/ServiceInfo.h new file mode 100644 index 00000000..d0abd6b9 --- /dev/null +++ b/include/pulsar/ServiceInfo.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_SERVICE_INFO_H_ +#define PULSAR_SERVICE_INFO_H_ + +#include + +#include +#include + +namespace pulsar { + +struct PULSAR_PUBLIC ServiceInfo { + std::string serviceUrl; + std::optional authentication; + std::optional tlsTrustCertsFilePath; + + bool operator==(const ServiceInfo& other) const { + return serviceUrl == other.serviceUrl && authentication == other.authentication && + tlsTrustCertsFilePath == other.tlsTrustCertsFilePath; + } +}; + +} // namespace pulsar + +#endif diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc new file mode 100644 index 00000000..eaf6ada7 --- /dev/null +++ b/lib/AutoClusterFailover.cc @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include + +#include "ClientImpl.h" +#include "ExecutorService.h" +#include "LogUtils.h" + +DECLARE_LOG_OBJECT() + +namespace pulsar { + +class AutoClusterFailoverImpl : public std::enable_shared_from_this { + public: + explicit AutoClusterFailoverImpl(AutoClusterFailover::Config&& config) : config_(std::move(config)) {} + + ~AutoClusterFailoverImpl() { running_->store(false, std::memory_order_release); } + + void initialize(const ClientImplPtr& client) { + client_ = client; + executor_ = client->getIOExecutorProvider()->get(); + scheduleProbeAndUpdateServiceUrl(); + } + + void scheduleProbeAndUpdateServiceUrl() { + auto timer = executor_->createDeadlineTimer(); + timer->expires_after(config_.checkInterval); + timer->async_wait([this, weakSelf{weak_from_this()}, timer](auto error) { + auto self = weakSelf.lock(); + if (!self) { + LOG_INFO("AutoClusterFailoverImpl has been destroyed, exiting timer callback"); + return; + } + auto closed = !running_->load(std::memory_order_acquire); + if (!error || closed) { + LOG_INFO("AutoClusterFailover exited, timer error: " << error.message() + << ", closed: " << closed); + return; + } + + probeAndUpdateServiceUrl(); + }); + } + + void probeAndUpdateServiceUrl() { + auto currentServiceInfo = client_->getServiceInfo(); + if (currentServiceInfo == config_.primary) { + // TODO: probe whether primary is down + } else { + // TODO: + // 1. probe whether current (one secondary) is down + // 2. if not, check whether primary is up and switch back if it is + } + scheduleProbeAndUpdateServiceUrl(); + } + + private: + const AutoClusterFailover::Config config_; + ClientImplPtr client_; + ExecutorServicePtr executor_; + std::shared_ptr running_{std::make_shared(true)}; +}; + +AutoClusterFailover::AutoClusterFailover(AutoClusterFailover::Config&& config) + : impl_(std::make_shared(std::move(config))) {} + +AutoClusterFailover::~AutoClusterFailover() {} + +void AutoClusterFailover::initialize(Client& client) { impl_->initialize(client.impl_); } + +} // namespace pulsar diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 2a6a8298..9af01b7e 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -236,6 +236,7 @@ class ClientImpl : public std::enable_shared_from_this { LookupServiceFactory lookupServiceFactory_; friend class Client; + friend class AutoClusterFailoverImpl; }; } /* namespace pulsar */ From 1a1efd8825811a23b2e4f4698b936b3c8b2df79d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 22:20:31 +0800 Subject: [PATCH 12/12] add implementation and tests --- include/pulsar/AutoClusterFailover.h | 36 +++- lib/AutoClusterFailover.cc | 163 +++++++++++++++++- tests/AutoClusterFailoverTest.cc | 241 +++++++++++++++++++++++++++ 3 files changed, 434 insertions(+), 6 deletions(-) create mode 100644 tests/AutoClusterFailoverTest.cc diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h index a5594c7b..bdc3ee70 100644 --- a/include/pulsar/AutoClusterFailover.h +++ b/include/pulsar/AutoClusterFailover.h @@ -38,9 +38,30 @@ class PULSAR_PUBLIC AutoClusterFailover final { struct Config { ServiceInfo primary; std::vector secondary; - std::chrono::milliseconds checkInterval{30000}; // 30 seconds + std::chrono::milliseconds checkInterval{30000}; // 30 seconds + std::chrono::milliseconds failoverDelay{30000}; // 30 seconds + std::chrono::milliseconds switchBackDelay{60000}; // 60 seconds }; + /** + * Builder helps create an AutoClusterFailover configuration. + * + * Example: + * ServiceInfo primary{...}; + * std::vector secondaries{...}; + * AutoClusterFailover provider = AutoClusterFailover::Builder(primary, secondaries) + * .withCheckInterval(std::chrono::seconds(30)) + * .withFailoverDelay(std::chrono::seconds(30)) + * .withSwitchBackDelay(std::chrono::seconds(60)) + * .build(); + * + * Notes: + * - primary: the preferred cluster to use when available. + * - secondary: ordered list of fallback clusters. + * - checkInterval: frequency of health probes. + * - failoverDelay: how long the current cluster must be unreachable before switching. + * - switchBackDelay: how long the primary must remain healthy before switching back. + */ class Builder { public: Builder(ServiceInfo primary, std::vector secondary) { @@ -48,11 +69,24 @@ class PULSAR_PUBLIC AutoClusterFailover final { config_.secondary = std::move(secondary); } + // Set how frequently probes run against the active cluster(s). Builder& withCheckInterval(std::chrono::milliseconds interval) { config_.checkInterval = interval; return *this; } + // Set how long the current cluster must be unreachable before attempting failover. + Builder& withFailoverDelay(std::chrono::milliseconds delay) { + config_.failoverDelay = delay; + return *this; + } + + // Set how long the primary must remain healthy before switching back from a secondary. + Builder& withSwitchBackDelay(std::chrono::milliseconds delay) { + config_.switchBackDelay = delay; + return *this; + } + AutoClusterFailover build() { return AutoClusterFailover(std::move(config_)); } private: diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index eaf6ada7..ed6cf5dc 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -20,16 +20,103 @@ #include #include +#include #include +#ifdef USE_ASIO +#include +#include +#include +#include +#else +#include +#include +#include +#include +#endif + +#include "AsioDefines.h" #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" +#include "ServiceURI.h" DECLARE_LOG_OBJECT() namespace pulsar { +// Probe whether a Pulsar service URL is reachable by attempting a TCP connection. +// Parses the first host:port from the service URL and tries to connect within timeoutMs. +static bool probeAvailable(const std::string& serviceUrl, int timeoutMs) { + try { + ServiceURI uri(serviceUrl); + const auto& hosts = uri.getServiceHosts(); + if (hosts.empty()) { + return false; + } + + // Each entry in getServiceHosts() is a full URL like "pulsar://host:port". + // Strip the scheme prefix to get "host:port". + const auto& hostAddr = hosts[0]; + const auto schemeEnd = hostAddr.find("://"); + if (schemeEnd == std::string::npos) { + return false; + } + const std::string hostPort = hostAddr.substr(schemeEnd + 3); + const auto colonPos = hostPort.rfind(':'); + if (colonPos == std::string::npos) { + return false; + } + const std::string host = hostPort.substr(0, colonPos); + const std::string port = hostPort.substr(colonPos + 1); + + ASIO::io_context ioCtx; + ASIO::ip::tcp::resolver resolver(ioCtx); + ASIO_ERROR ec; + const auto endpoints = resolver.resolve(host, port, ec); + if (ec) { + LOG_WARN("probeAvailable: failed to resolve " << host << ":" << port << " - " << ec.message()); + return false; + } + + ASIO::ip::tcp::socket socket(ioCtx); + bool connected = false; + ASIO::steady_timer timer(ioCtx); + timer.expires_after(std::chrono::milliseconds(timeoutMs)); + timer.async_wait([&](const ASIO_ERROR& timerEc) { + if (!timerEc) { + ASIO_ERROR closeEc; + socket.close(closeEc); + } + }); + ASIO::async_connect(socket, endpoints, + [&](const ASIO_ERROR& connectEc, const ASIO::ip::tcp::endpoint&) { + if (!connectEc) { + connected = true; + } + timer.cancel(); + }); + ioCtx.run(); + + if (connected) { + ASIO_ERROR closeEc; + socket.close(closeEc); + } + return connected; + } catch (const std::exception& e) { + LOG_WARN("probeAvailable: exception probing " << serviceUrl << ": " << e.what()); + return false; + } +} + +static int64_t nowMs() { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); +} + +static constexpr int kProbeTimeoutMs = 30000; + class AutoClusterFailoverImpl : public std::enable_shared_from_this { public: explicit AutoClusterFailoverImpl(AutoClusterFailover::Config&& config) : config_(std::move(config)) {} @@ -52,7 +139,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisload(std::memory_order_acquire); - if (!error || closed) { + if (error || closed) { LOG_INFO("AutoClusterFailover exited, timer error: " << error.message() << ", closed: " << closed); return; @@ -64,13 +151,75 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisgetServiceInfo(); + const auto now = nowMs(); + if (currentServiceInfo == config_.primary) { - // TODO: probe whether primary is down + // Currently on primary: probe it and fail over to first available secondary if it's down. + if (probeAvailable(currentServiceInfo.serviceUrl, kProbeTimeoutMs)) { + failedTimestamp_ = -1; + } else { + if (failedTimestamp_ < 0) { + failedTimestamp_ = now; + } else if (now - failedTimestamp_ >= config_.failoverDelay.count()) { + for (const auto& secondary : config_.secondary) { + if (probeAvailable(secondary.serviceUrl, kProbeTimeoutMs)) { + LOG_INFO("Primary " << currentServiceInfo.serviceUrl << " has been down for " + << (now - failedTimestamp_) << "ms, switching to secondary " + << secondary.serviceUrl); + auto info = secondary; + client_->updateServiceInfo(std::move(info)); + failedTimestamp_ = -1; + break; + } else { + LOG_WARN("Primary " << currentServiceInfo.serviceUrl << " has been down for " + << (now - failedTimestamp_) << "ms. Secondary " + << secondary.serviceUrl + << " is also unavailable, trying next."); + } + } + } + } } else { - // TODO: - // 1. probe whether current (one secondary) is down - // 2. if not, check whether primary is up and switch back if it is + // Currently on a secondary: probe it. + if (probeAvailable(currentServiceInfo.serviceUrl, kProbeTimeoutMs)) { + failedTimestamp_ = -1; + // Secondary is up; check whether primary has recovered long enough to switch back. + if (probeAvailable(config_.primary.serviceUrl, kProbeTimeoutMs)) { + if (recoverTimestamp_ < 0) { + recoverTimestamp_ = now; + } else if (now - recoverTimestamp_ >= config_.switchBackDelay.count()) { + LOG_INFO("Primary " << config_.primary.serviceUrl << " has been recovered for " + << (now - recoverTimestamp_) << "ms, switching back from " + << currentServiceInfo.serviceUrl); + auto info = config_.primary; + client_->updateServiceInfo(std::move(info)); + recoverTimestamp_ = -1; + } + } else { + recoverTimestamp_ = -1; + } + } else { + // Current secondary is down; reset recovery clock and attempt to switch back to primary. + recoverTimestamp_ = -1; + if (failedTimestamp_ < 0) { + failedTimestamp_ = now; + } else if (now - failedTimestamp_ >= config_.failoverDelay.count()) { + if (probeAvailable(config_.primary.serviceUrl, kProbeTimeoutMs)) { + LOG_INFO("Secondary " << currentServiceInfo.serviceUrl << " has been down for " + << (now - failedTimestamp_) << "ms, switching back to primary " + << config_.primary.serviceUrl); + auto info = config_.primary; + client_->updateServiceInfo(std::move(info)); + failedTimestamp_ = -1; + } else { + LOG_ERROR("Secondary " << currentServiceInfo.serviceUrl << " has been down for " + << (now - failedTimestamp_) << "ms and primary " + << config_.primary.serviceUrl << " is also unavailable."); + } + } + } } + scheduleProbeAndUpdateServiceUrl(); } @@ -79,6 +228,10 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this running_{std::make_shared(true)}; + // Timestamp (ms) when the current service first became unreachable; -1 when it is reachable. + int64_t failedTimestamp_{-1}; + // Timestamp (ms) when the primary first became reachable again while on a secondary; -1 otherwise. + int64_t recoverTimestamp_{-1}; }; AutoClusterFailover::AutoClusterFailover(AutoClusterFailover::Config&& config) diff --git a/tests/AutoClusterFailoverTest.cc b/tests/AutoClusterFailoverTest.cc new file mode 100644 index 00000000..3446fb2f --- /dev/null +++ b/tests/AutoClusterFailoverTest.cc @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include + +#ifdef USE_ASIO +#include +#else +#include +#endif + +#include "WaitUtils.h" +#include "lib/AsioDefines.h" + +using namespace pulsar; +using namespace std::chrono_literals; + +// A TCP acceptor that simply listens on a random port. +// The kernel handles the TCP handshake into the backlog without calling accept(), +// so probe() (which only does a TCP connect) will succeed while the listener is open. +class SimpleListener { + public: + SimpleListener() : acceptor_(io_context_, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), 0)) { + port_ = static_cast(acceptor_.local_endpoint().port()); + } + + uint16_t getPort() const { return port_; } + + std::string getServiceUrl() const { return "pulsar://localhost:" + std::to_string(port_); } + + // Close the listener, simulating the service going down. + void close() { + ASIO_ERROR ec; + acceptor_.close(ec); + } + + // Reopen the listener on the same port, simulating the service coming back. + void reopen() { + ASIO::ip::tcp::endpoint ep(ASIO::ip::tcp::v4(), port_); + acceptor_.open(ep.protocol()); + acceptor_.set_option(ASIO::ip::tcp::acceptor::reuse_address(true)); + acceptor_.bind(ep); + acceptor_.listen(); + } + + private: + ASIO::io_context io_context_; + ASIO::ip::tcp::acceptor acceptor_; + uint16_t port_{0}; +}; + +// Fast timings used across all unit tests. +static constexpr auto kCheckInterval = 200ms; +static constexpr auto kNoDelay = 0ms; +// Allow at least 2 check intervals to pass before failover/switch-back can trigger, +// plus a comfortable margin. +static constexpr auto kWaitTimeout = 3s; + +// Wait until the client's current service URL matches the expected value. +static bool waitForServiceUrl(Client& client, const std::string& expected) { + return waitUntil(kWaitTimeout, [&] { return client.getServiceInfo().serviceUrl == expected; }); +} + +// ====================== Unit tests (mock TCP listeners) ====================== + +TEST(AutoClusterFailoverTest, testStayOnPrimaryWhenPrimaryIsUp) { + SimpleListener primary; + SimpleListener secondary; + + Client client{primary.getServiceUrl()}; + + auto failover = AutoClusterFailover::Builder({primary.getServiceUrl()}, {{secondary.getServiceUrl()}}) + .withCheckInterval(kCheckInterval) + .withFailoverDelay(kNoDelay) + .withSwitchBackDelay(kNoDelay) + .build(); + failover.initialize(client); + + // Let a few probe cycles run; should remain on primary. + std::this_thread::sleep_for(kCheckInterval * 4); + EXPECT_EQ(primary.getServiceUrl(), client.getServiceInfo().serviceUrl); +} + +TEST(AutoClusterFailoverTest, testFailoverWhenPrimaryIsDown) { + SimpleListener secondary; + + // Primary port is not listening: probe will always fail. + const std::string unreachableUrl = "pulsar://localhost:19998"; + + Client client{unreachableUrl}; + + auto failover = AutoClusterFailover::Builder({unreachableUrl}, {{secondary.getServiceUrl()}}) + .withCheckInterval(kCheckInterval) + .withFailoverDelay(kNoDelay) + .withSwitchBackDelay(kNoDelay) + .build(); + failover.initialize(client); + + EXPECT_TRUE(waitForServiceUrl(client, secondary.getServiceUrl())); +} + +TEST(AutoClusterFailoverTest, testSwitchBackWhenPrimaryRecovers) { + SimpleListener primary; + SimpleListener secondary; + + Client client{primary.getServiceUrl()}; + + auto failover = AutoClusterFailover::Builder({primary.getServiceUrl()}, {{secondary.getServiceUrl()}}) + .withCheckInterval(kCheckInterval) + .withFailoverDelay(kNoDelay) + .withSwitchBackDelay(kNoDelay) + .build(); + failover.initialize(client); + + // Verify we start on primary. + ASSERT_EQ(primary.getServiceUrl(), client.getServiceInfo().serviceUrl); + + // Take primary down. + primary.close(); + EXPECT_TRUE(waitForServiceUrl(client, secondary.getServiceUrl())); + + // Bring primary back. + primary.reopen(); + EXPECT_TRUE(waitForServiceUrl(client, primary.getServiceUrl())); +} + +TEST(AutoClusterFailoverTest, testSkipUnreachableSecondary) { + SimpleListener secondary2; + + // secondary1 is not listening; secondary2 is. + const std::string unreachablePrimary = "pulsar://localhost:19998"; + const std::string unreachableSecondary1 = "pulsar://localhost:19999"; + + Client client{unreachablePrimary}; + + auto failover = AutoClusterFailover::Builder({unreachablePrimary}, + {{unreachableSecondary1}, {secondary2.getServiceUrl()}}) + .withCheckInterval(kCheckInterval) + .withFailoverDelay(kNoDelay) + .withSwitchBackDelay(kNoDelay) + .build(); + failover.initialize(client); + + // Should skip secondary1 and land on secondary2. + EXPECT_TRUE(waitForServiceUrl(client, secondary2.getServiceUrl())); +} + +// ====================== Integration tests (real Pulsar at localhost:6650) ====================== + +TEST(AutoClusterFailoverTest, testFailoverToRealCluster) { + const std::string unreachableUrl = "pulsar://localhost:19998"; + const std::string realUrl = "pulsar://localhost:6650"; + + Client client{unreachableUrl}; + + auto failover = AutoClusterFailover::Builder({unreachableUrl}, {{realUrl}}) + .withCheckInterval(kCheckInterval) + .withFailoverDelay(kNoDelay) + .withSwitchBackDelay(kNoDelay) + .build(); + failover.initialize(client); + + // Wait until failover to the real cluster. + ASSERT_TRUE(waitForServiceUrl(client, realUrl)); + + // Verify we can create a producer on the real cluster after failover. + Producer producer; + const auto topic = "AutoClusterFailoverTest-" + std::to_string(time(nullptr)); + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + producer.close(); + client.close(); +} + +TEST(AutoClusterFailoverTest, testSwitchBackToRealCluster) { + SimpleListener fakeSecondary; + const std::string realUrl = "pulsar://localhost:6650"; + + // Start on the real cluster (primary). + Client client{realUrl}; + + auto failover = AutoClusterFailover::Builder({realUrl}, {{fakeSecondary.getServiceUrl()}}) + .withCheckInterval(kCheckInterval) + .withFailoverDelay(kNoDelay) + .withSwitchBackDelay(kNoDelay) + .build(); + failover.initialize(client); + + ASSERT_EQ(realUrl, client.getServiceInfo().serviceUrl); + + // Simulate the real cluster going down by closing the fake secondary listener and + // pointing to an unreachable URL instead. Instead, we'll use a forward approach: + // take primary "down" by swapping: use a listener as primary and real cluster as secondary, + // then close the listener. + client.close(); + + // Restart: fake listener as primary, real cluster as secondary. + SimpleListener fakePrimary; + Client client2{fakePrimary.getServiceUrl()}; + + auto failover2 = AutoClusterFailover::Builder({fakePrimary.getServiceUrl()}, {{realUrl}}) + .withCheckInterval(kCheckInterval) + .withFailoverDelay(kNoDelay) + .withSwitchBackDelay(kNoDelay) + .build(); + failover2.initialize(client2); + + ASSERT_EQ(fakePrimary.getServiceUrl(), client2.getServiceInfo().serviceUrl); + + // Close the fake primary — triggers failover to real cluster. + fakePrimary.close(); + ASSERT_TRUE(waitForServiceUrl(client2, realUrl)); + + // Verify producers work on the real cluster after failover. + Producer producer; + const auto topic = "AutoClusterFailoverSwitchBack-" + std::to_string(time(nullptr)); + ASSERT_EQ(ResultOk, client2.createProducer(topic, producer)); + producer.close(); + + // Bring fake primary back — should switch back. + fakePrimary.reopen(); + ASSERT_TRUE(waitForServiceUrl(client2, fakePrimary.getServiceUrl())); + + client2.close(); +}