From 7862234f5f6a5b341693f9978773b90d73d6fe25 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 11:09:48 +0800 Subject: [PATCH 01/12] Initial commit --- include/pulsar/Client.h | 15 +++++++++++++++ lib/Client.cc | 7 +++++++ lib/ClientImpl.cc | 8 ++++++++ lib/ClientImpl.h | 4 ++++ 4 files changed, 34 insertions(+) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 56130667..95233fe9 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -32,6 +32,7 @@ #include #include +#include #include namespace pulsar { @@ -414,6 +415,20 @@ class PULSAR_PUBLIC Client { void getSchemaInfoAsync(const std::string& topic, int64_t version, std::function callback); + /** + * Update the connection information of the client, including service URL, authentication and TLS trust + * certs file path. + * This method is used to switch the connection to a different Pulsar cluster. All connections will be + * closed and the internal connection info will be updated. + * + * @param serviceUrl the Pulsar endpoint to use (eg: pulsar://localhost:6650) + * @param authentication the authentication information to use for connecting to the Pulsar cluster + * @param tlsTrustCertsFilePath the TLS trust certs file path to use for connecting to the Pulsar cluster + */ + void updateConnectionInfo(const std::string& serviceUrl, + const std::optional& authentication, + const std::optional& tlsTrustCertsFilePath); + private: Client(const std::shared_ptr&); diff --git a/lib/Client.cc b/lib/Client.cc index 39a5948a..60fb62f5 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -197,4 +197,11 @@ void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, ->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") .addListener(std::move(callback)); } + +void Client::updateConnectionInfo(const std::string& serviceUrl, + const std::optional& authentication, + const std::optional& tlsTrustCertsFilePath) { + impl_->updateConnectionInfo(serviceUrl, authentication, tlsTrustCertsFilePath); +} + } // namespace pulsar diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index eec3b34a..229ee2f3 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -854,4 +854,12 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati return clientConfiguration.impl_->operationTimeout; } +void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, + const std::optional& authentication, + const std::optional& tlsTrustCertsFilePath) { + // TODO: + // 1. Reset the `lookupServicePtr_` with the new serviceUrl and auth parameters, and close the old one. + // 2. Close all connections in `pool_` +} + } /* namespace pulsar */ diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 0b4d5969..e89aa98a 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -139,6 +139,10 @@ class ClientImpl : public std::enable_shared_from_this { ConnectionPool& getConnectionPool() noexcept { return pool_; } uint64_t getLookupCount() { return lookupCount_; } + void updateConnectionInfo(const std::string& serviceUrl, + const std::optional& authentication, + const std::optional& tlsTrustCertsFilePath); + static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); friend class PulsarFriend; From a1746a9d1bf3c37c398845270ec90f36fb06f39e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 11:33:24 +0800 Subject: [PATCH 02/12] implemented --- lib/ClientImpl.cc | 64 ++++++++++++++++++++++++++++++++----------- lib/ConnectionPool.cc | 15 ++++++++++ lib/ConnectionPool.h | 6 ++++ tests/ClientTest.cc | 47 +++++++++++++++++++++++++++++++ 4 files changed, 116 insertions(+), 16 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 229ee2f3..9be08415 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -144,11 +144,11 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { } LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) { + Lock lock(mutex_); if (redirectedClusterURI.empty()) { return lookupServicePtr_; } - Lock lock(mutex_); auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); if (it == redirectedClusterLookupServicePtrs_.end()) { auto lookup = createLookup(redirectedClusterURI); @@ -180,7 +180,8 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon if (autoDownloadSchema) { auto self = shared_from_this(); - lookupServicePtr_->getSchema(topicName).addListener( + auto lookup = getLookup(); + lookup->getSchema(topicName).addListener( [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { if (res != ResultOk) { callback(res, Producer()); @@ -188,12 +189,12 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } ProducerConfiguration conf; conf.setSchema(topicSchema); - self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + self->getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); }); } else { - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); } @@ -266,7 +267,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st } MessageId msgId(startMessageId); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, msgId, conf, callback)); } @@ -379,7 +380,8 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const return; } - lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) + getLookup() + ->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, regexPattern, mode, subscriptionName, conf, callback)); @@ -403,7 +405,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace consumer = std::make_shared(shared_from_this(), regexPattern, mode, *matchTopics, subscriptionName, conf, - lookupServicePtr_, interceptors); + getLookup(), interceptors); consumer->getConsumerCreatedFuture().addListener( std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -450,7 +452,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto interceptors = std::make_shared(conf.getInterceptors()); ConsumerImplBasePtr consumer = std::make_shared( - shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_, interceptors); + shared_from_this(), topics, subscriptionName, topicNamePtr, conf, getLookup(), interceptors); consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -480,7 +482,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, subscriptionName, conf, callback)); } @@ -505,7 +507,7 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti } consumer = std::make_shared( shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf, - lookupServicePtr_, interceptors); + getLookup(), interceptors); } else { auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), subscriptionName, conf, @@ -658,7 +660,7 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP return; } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getLookup()->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, callback)); } @@ -674,7 +676,7 @@ void ClientImpl::closeAsync(const CloseCallback& callback) { state_ = Closing; memoryLimitController_.close(); - lookupServicePtr_->close(); + getLookup()->close(); for (const auto& it : redirectedClusterLookupServicePtrs_) { it.second->close(); } @@ -776,7 +778,7 @@ void ClientImpl::shutdown() { << " consumers have been shutdown."); } - lookupServicePtr_->close(); + getLookup()->close(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; @@ -857,9 +859,39 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, const std::optional& authentication, const std::optional& tlsTrustCertsFilePath) { - // TODO: - // 1. Reset the `lookupServicePtr_` with the new serviceUrl and auth parameters, and close the old one. - // 2. Close all connections in `pool_` + LookupServicePtr oldLookupServicePtr; + std::unordered_map oldRedirectedLookupServicePtrs; + + { + Lock lock(mutex_); + if (state_ != Open) { + LOG_ERROR("Client is not open, cannot update connection info"); + return; + } + + if (authentication.has_value()) { + clientConfiguration_.setAuth(*authentication); + } + if (tlsTrustCertsFilePath.has_value()) { + clientConfiguration_.setTlsTrustCertsFilePath(*tlsTrustCertsFilePath); + } + clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl))); + + oldLookupServicePtr = std::move(lookupServicePtr_); + oldRedirectedLookupServicePtrs = std::move(redirectedClusterLookupServicePtrs_); + + lookupServicePtr_ = createLookup(serviceUrl); + redirectedClusterLookupServicePtrs_.clear(); + } + + if (oldLookupServicePtr) { + oldLookupServicePtr->close(); + } + for (const auto& it : oldRedirectedLookupServicePtrs) { + it.second->close(); + } + + pool_.resetConnections(clientConfiguration_.getAuthPtr(), clientConfiguration_); } } /* namespace pulsar */ diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index df050b0e..d94d8fe8 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -67,6 +67,21 @@ bool ConnectionPool::close() { return true; } +void ConnectionPool::resetConnections(const AuthenticationPtr& authentication, + const ClientConfiguration& conf) { + std::unique_lock lock(mutex_); + authentication_ = authentication; + clientConfiguration_ = conf; + + for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { + auto& cnx = cnxIt->second; + if (cnx) { + cnx->close(ResultDisconnected, false); + } + } + pool_.clear(); +} + static const std::string getKey(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix) { std::stringstream ss; diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h index 0e3a6d0a..64907b4f 100644 --- a/lib/ConnectionPool.h +++ b/lib/ConnectionPool.h @@ -51,6 +51,12 @@ class PULSAR_PUBLIC ConnectionPool { */ bool close(); + /** + * Close all existing connections and update the authentication and configuration. + * Unlike close(), the pool remains open for new connections. + */ + void resetConnections(const AuthenticationPtr& authentication, const ClientConfiguration& conf); + void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix, ClientConnection* value); diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index dd892686..f9f4b93b 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -506,3 +506,50 @@ TEST(ClientTest, testNoRetry) { ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms"; } } + +TEST(ClientTest, testUpdateConnectionInfo) { + const std::string cluster1Url = "pulsar://localhost:6650"; + const std::string cluster2Url = "pulsar://localhost:6652"; + const std::string topic1 = "testUpdateConnectionInfo-cluster1-" + std::to_string(time(nullptr)); + const std::string topic2 = "testUpdateConnectionInfo-cluster2-" + std::to_string(time(nullptr)); + + Client client(cluster1Url); + + // Produce and consume on cluster 1 + Producer producer1; + ASSERT_EQ(ResultOk, client.createProducer(topic1, producer1)); + MessageId msgId; + ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("msg-on-cluster1").build(), msgId)); + producer1.close(); + + // Verify there are connections in the pool + auto connections = PulsarFriend::getConnections(client); + ASSERT_FALSE(connections.empty()); + + // Switch to cluster 2 + client.updateConnectionInfo(cluster2Url, std::nullopt, std::nullopt); + + // Previous connections should have been closed + for (const auto &cnx : connections) { + ASSERT_TRUE(cnx->isClosed()); + } + + // Produce and consume on cluster 2 using the same client + Producer producer2; + ASSERT_EQ(ResultOk, client.createProducer(topic2, producer2)); + ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("msg-on-cluster2").build(), msgId)); + + Consumer consumer2; + ASSERT_EQ(ResultOk, client.subscribe(topic2, "sub", consumer2)); + Message msg; + ASSERT_EQ(ResultOk, consumer2.receive(msg, 5000)); + ASSERT_EQ("msg-on-cluster2", msg.getDataAsString()); + + // Verify connection pool now has connections to cluster 2 + auto newConnections = PulsarFriend::getConnections(client); + ASSERT_FALSE(newConnections.empty()); + + consumer2.close(); + producer2.close(); + client.close(); +} From 15908f99dedd18e2f8393b15040578a790f00c2f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 15:08:38 +0800 Subject: [PATCH 03/12] fix auth not cleared if it's not set and improve tests --- lib/ClientImpl.cc | 4 ++ tests/AuthTokenTest.cc | 2 +- tests/ClientTest.cc | 97 +++++++++++++++++++++++++----------------- 3 files changed, 63 insertions(+), 40 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 9be08415..98fe3c30 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -871,9 +871,13 @@ void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, if (authentication.has_value()) { clientConfiguration_.setAuth(*authentication); + } else { + clientConfiguration_.setAuth(AuthFactory::Disabled()); } if (tlsTrustCertsFilePath.has_value()) { clientConfiguration_.setTlsTrustCertsFilePath(*tlsTrustCertsFilePath); + } else { + clientConfiguration_.setTlsTrustCertsFilePath(""); } clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl))); diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc index 84e8572d..4bfd8085 100644 --- a/tests/AuthTokenTest.cc +++ b/tests/AuthTokenTest.cc @@ -42,7 +42,7 @@ static const std::string serviceUrlHttp = "http://localhost:8080"; static const std::string tokenPath = TOKEN_PATH; -static std::string getToken() { +std::string getToken() { std::ifstream file(tokenPath); std::string str((std::istreambuf_iterator(file)), std::istreambuf_iterator()); return str; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index f9f4b93b..37376cf2 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -17,13 +17,16 @@ * under the License. */ #include +#include #include +#include #include #include #include #include -#include +#include +#include #include "MockClientImpl.h" #include "PulsarAdminHelper.h" @@ -32,7 +35,6 @@ #include "lib/ClientConnection.h" #include "lib/LogUtils.h" #include "lib/checksum/ChecksumProvider.h" -#include "lib/stats/ProducerStatsImpl.h" DECLARE_LOG_OBJECT() @@ -508,48 +510,65 @@ TEST(ClientTest, testNoRetry) { } TEST(ClientTest, testUpdateConnectionInfo) { - const std::string cluster1Url = "pulsar://localhost:6650"; - const std::string cluster2Url = "pulsar://localhost:6652"; - const std::string topic1 = "testUpdateConnectionInfo-cluster1-" + std::to_string(time(nullptr)); - const std::string topic2 = "testUpdateConnectionInfo-cluster2-" + std::to_string(time(nullptr)); - - Client client(cluster1Url); - - // Produce and consume on cluster 1 - Producer producer1; - ASSERT_EQ(ResultOk, client.createProducer(topic1, producer1)); + extern std::string getToken(); // from AuthToken.cc + + // Access "private/auth" namespace in cluster 1 + auto info1 = + std::make_tuple("pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt); + // Access "private/auth" namespace in cluster 2 + auto info2 = + std::make_tuple("pulsar+ssl://localhost:6653", + AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), + TEST_CONF_DIR "/hn-verification/cacert.pem"); + // Access "public/default" namespace in cluster 1, which doesn't require authentication + auto info3 = std::make_tuple("pulsar://localhost:6650", std::nullopt, std::nullopt); + + Client client{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; + const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer)); MessageId msgId; - ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("msg-on-cluster1").build(), msgId)); - producer1.close(); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-0").build(), msgId)); - // Verify there are connections in the pool - auto connections = PulsarFriend::getConnections(client); - ASSERT_FALSE(connections.empty()); + // Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh) + ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); + client.updateConnectionInfo(std::get<0>(info2), std::get<1>(info2), std::get<2>(info2)); + ASSERT_TRUE(PulsarFriend::getConnections(client).empty()); - // Switch to cluster 2 - client.updateConnectionInfo(cluster2Url, std::nullopt, std::nullopt); + // Now the same will access the same topic in cluster 2 + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-1").build(), msgId)); + ASSERT_EQ(ResultOk, producer.close()); - // Previous connections should have been closed - for (const auto &cnx : connections) { - ASSERT_TRUE(cnx->isClosed()); - } + // Switch back to cluster 1 without any authentication, the previous authentication info configured for + // cluster 2 will be cleared. + client.updateConnectionInfo(std::get<0>(info3), std::get<1>(info3), std::get<2>(info3)); - // Produce and consume on cluster 2 using the same client - Producer producer2; - ASSERT_EQ(ResultOk, client.createProducer(topic2, producer2)); - ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("msg-on-cluster2").build(), msgId)); + const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build(), msgId)); - Consumer consumer2; - ASSERT_EQ(ResultOk, client.subscribe(topic2, "sub", consumer2)); - Message msg; - ASSERT_EQ(ResultOk, consumer2.receive(msg, 5000)); - ASSERT_EQ("msg-on-cluster2", msg.getDataAsString()); - - // Verify connection pool now has connections to cluster 2 - auto newConnections = PulsarFriend::getConnections(client); - ASSERT_FALSE(newConnections.empty()); - - consumer2.close(); - producer2.close(); client.close(); + + // Verify messages sent to cluster 1 and cluster 2 can be consumed successfully with correct + // authentication info. + auto verify = [](Client &client, const std::string &topic, const std::string &value) { + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader)); + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); + ASSERT_EQ(value, msg.getDataAsString()); + }; + Client client1{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; + verify(client1, topicRequiredAuth, "msg-0"); + client1.close(); + + Client client2{ + std::get<0>(info2), + ClientConfiguration().setAuth(std::get<1>(info2)).setTlsTrustCertsFilePath(std::get<2>(info2))}; + verify(client2, topicRequiredAuth, "msg-1"); + client2.close(); + + Client client3{std::get<0>(info3)}; + verify(client3, topicNoAuth, "msg-2"); + client3.close(); } From 8b0899f189df1314b86049f4d339d3b98565d135 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 16:14:55 +0800 Subject: [PATCH 04/12] refactor and add a getServiceInfo method --- include/pulsar/Client.h | 27 ++++++++++++++++++--------- lib/Client.cc | 8 +++----- lib/ClientImpl.cc | 38 +++++++++++++++++++++++++++++--------- lib/ClientImpl.h | 6 +++--- tests/ClientTest.cc | 36 +++++++++++++++++++++--------------- 5 files changed, 74 insertions(+), 41 deletions(-) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 95233fe9..8fccbe19 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -43,6 +43,12 @@ typedef std::function TableViewCallback; typedef std::function&)> GetPartitionsCallback; typedef std::function CloseCallback; +struct PULSAR_PUBLIC ServiceInfo { + std::string serviceUrl; + std::optional authentication; + std::optional tlsTrustCertsFilePath; +}; + class ClientImpl; class PulsarFriend; class PulsarWrapper; @@ -416,18 +422,21 @@ class PULSAR_PUBLIC Client { std::function callback); /** - * Update the connection information of the client, including service URL, authentication and TLS trust - * certs file path. + * Update the service information of the client. + * * This method is used to switch the connection to a different Pulsar cluster. All connections will be - * closed and the internal connection info will be updated. + * closed and the internal service info will be updated. * - * @param serviceUrl the Pulsar endpoint to use (eg: pulsar://localhost:6650) - * @param authentication the authentication information to use for connecting to the Pulsar cluster - * @param tlsTrustCertsFilePath the TLS trust certs file path to use for connecting to the Pulsar cluster + * @param serviceInfo the service URL, authentication and TLS trust certs file path to use + */ + void updateServiceInfo(const ServiceInfo& serviceInfo); + + /** + * Get the current service information of the client. + * + * @return the current service information */ - void updateConnectionInfo(const std::string& serviceUrl, - const std::optional& authentication, - const std::optional& tlsTrustCertsFilePath); + ServiceInfo getServiceInfo(); private: Client(const std::shared_ptr&); diff --git a/lib/Client.cc b/lib/Client.cc index 60fb62f5..6ceaa117 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -198,10 +198,8 @@ void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, .addListener(std::move(callback)); } -void Client::updateConnectionInfo(const std::string& serviceUrl, - const std::optional& authentication, - const std::optional& tlsTrustCertsFilePath) { - impl_->updateConnectionInfo(serviceUrl, authentication, tlsTrustCertsFilePath); -} +void Client::updateServiceInfo(const ServiceInfo& serviceInfo) { impl_->updateServiceInfo(serviceInfo); } + +ServiceInfo Client::getServiceInfo() { return impl_->getServiceInfo(); } } // namespace pulsar diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 98fe3c30..2aceafea 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -78,6 +78,15 @@ typedef std::unique_lock Lock; typedef std::vector StringList; +namespace { +std::optional toOptionalAuthentication(const AuthenticationPtr& authentication) { + if (!authentication || authentication->getAuthMethodName() == "none") { + return std::nullopt; + } + return authentication; +} +} // namespace + static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, ConnectionPool& pool, const AuthenticationPtr& auth) { @@ -101,6 +110,10 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& state_(Open), clientConfiguration_(ClientConfiguration(clientConfiguration) .setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))), + serviceInfo_{serviceUrl, toOptionalAuthentication(clientConfiguration.getAuthPtr()), + clientConfiguration.getTlsTrustCertsFilePath().empty() + ? std::nullopt + : std::make_optional(clientConfiguration.getTlsTrustCertsFilePath())}, memoryLimitController_(clientConfiguration.getMemoryLimit()), ioExecutorProvider_(std::make_shared(clientConfiguration_.getIOThreads())), listenerExecutorProvider_( @@ -856,9 +869,7 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati return clientConfiguration.impl_->operationTimeout; } -void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, - const std::optional& authentication, - const std::optional& tlsTrustCertsFilePath) { +void ClientImpl::updateServiceInfo(const ServiceInfo& serviceInfo) { LookupServicePtr oldLookupServicePtr; std::unordered_map oldRedirectedLookupServicePtrs; @@ -869,22 +880,26 @@ void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, return; } - if (authentication.has_value()) { - clientConfiguration_.setAuth(*authentication); + if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) { + clientConfiguration_.setAuth(*serviceInfo.authentication); } else { clientConfiguration_.setAuth(AuthFactory::Disabled()); } - if (tlsTrustCertsFilePath.has_value()) { - clientConfiguration_.setTlsTrustCertsFilePath(*tlsTrustCertsFilePath); + if (serviceInfo.tlsTrustCertsFilePath.has_value()) { + clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath); } else { clientConfiguration_.setTlsTrustCertsFilePath(""); } - clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl))); + clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl))); + serviceInfo_ = {serviceInfo.serviceUrl, toOptionalAuthentication(clientConfiguration_.getAuthPtr()), + clientConfiguration_.getTlsTrustCertsFilePath().empty() + ? std::nullopt + : std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())}; oldLookupServicePtr = std::move(lookupServicePtr_); oldRedirectedLookupServicePtrs = std::move(redirectedClusterLookupServicePtrs_); - lookupServicePtr_ = createLookup(serviceUrl); + lookupServicePtr_ = createLookup(serviceInfo.serviceUrl); redirectedClusterLookupServicePtrs_.clear(); } @@ -898,4 +913,9 @@ void ClientImpl::updateConnectionInfo(const std::string& serviceUrl, pool_.resetConnections(clientConfiguration_.getAuthPtr(), clientConfiguration_); } +ServiceInfo ClientImpl::getServiceInfo() { + Lock lock(mutex_); + return serviceInfo_; +} + } /* namespace pulsar */ diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index e89aa98a..c96f9aab 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -139,9 +139,8 @@ class ClientImpl : public std::enable_shared_from_this { ConnectionPool& getConnectionPool() noexcept { return pool_; } uint64_t getLookupCount() { return lookupCount_; } - void updateConnectionInfo(const std::string& serviceUrl, - const std::optional& authentication, - const std::optional& tlsTrustCertsFilePath); + void updateServiceInfo(const ServiceInfo& serviceInfo); + ServiceInfo getServiceInfo(); static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); @@ -196,6 +195,7 @@ class ClientImpl : public std::enable_shared_from_this { State state_; ClientConfiguration clientConfiguration_; + ServiceInfo serviceInfo_; MemoryLimitController memoryLimitController_; ExecutorServiceProviderPtr ioExecutorProvider_; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 37376cf2..5bc4d89b 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -513,17 +513,16 @@ TEST(ClientTest, testUpdateConnectionInfo) { extern std::string getToken(); // from AuthToken.cc // Access "private/auth" namespace in cluster 1 - auto info1 = - std::make_tuple("pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt); + ServiceInfo info1{"pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt}; // Access "private/auth" namespace in cluster 2 - auto info2 = - std::make_tuple("pulsar+ssl://localhost:6653", - AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), - TEST_CONF_DIR "/hn-verification/cacert.pem"); + ServiceInfo info2{ + "pulsar+ssl://localhost:6653", + AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), + TEST_CONF_DIR "/hn-verification/cacert.pem"}; // Access "public/default" namespace in cluster 1, which doesn't require authentication - auto info3 = std::make_tuple("pulsar://localhost:6650", std::nullopt, std::nullopt); + ServiceInfo info3{"pulsar://localhost:6650", std::nullopt, std::nullopt}; - Client client{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; + Client client{info1.serviceUrl, ClientConfiguration().setAuth(*info1.authentication)}; const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr)); Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer)); @@ -532,8 +531,11 @@ TEST(ClientTest, testUpdateConnectionInfo) { // Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh) ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); - client.updateConnectionInfo(std::get<0>(info2), std::get<1>(info2), std::get<2>(info2)); + client.updateServiceInfo(info2); ASSERT_TRUE(PulsarFriend::getConnections(client).empty()); + ASSERT_EQ(info2.serviceUrl, client.getServiceInfo().serviceUrl); + ASSERT_EQ(info2.authentication, client.getServiceInfo().authentication); + ASSERT_EQ(info2.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); // Now the same will access the same topic in cluster 2 ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-1").build(), msgId)); @@ -541,7 +543,10 @@ TEST(ClientTest, testUpdateConnectionInfo) { // Switch back to cluster 1 without any authentication, the previous authentication info configured for // cluster 2 will be cleared. - client.updateConnectionInfo(std::get<0>(info3), std::get<1>(info3), std::get<2>(info3)); + client.updateServiceInfo(info3); + ASSERT_EQ(info3.serviceUrl, client.getServiceInfo().serviceUrl); + ASSERT_EQ(info3.authentication, client.getServiceInfo().authentication); + ASSERT_EQ(info3.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr)); ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer)); @@ -558,17 +563,18 @@ TEST(ClientTest, testUpdateConnectionInfo) { ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); ASSERT_EQ(value, msg.getDataAsString()); }; - Client client1{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; + Client client1{info1.serviceUrl, ClientConfiguration().setAuth(*info1.authentication)}; verify(client1, topicRequiredAuth, "msg-0"); client1.close(); - Client client2{ - std::get<0>(info2), - ClientConfiguration().setAuth(std::get<1>(info2)).setTlsTrustCertsFilePath(std::get<2>(info2))}; + Client client2{info2.serviceUrl, + ClientConfiguration() + .setAuth(*info2.authentication) + .setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath)}; verify(client2, topicRequiredAuth, "msg-1"); client2.close(); - Client client3{std::get<0>(info3)}; + Client client3{info3.serviceUrl}; verify(client3, topicNoAuth, "msg-2"); client3.close(); } From 7ca81369a445c3380145251980c815bdc84ef0a9 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 16:19:59 +0800 Subject: [PATCH 05/12] fix format --- tests/ClientTest.cc | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 5bc4d89b..07128d19 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -515,10 +515,9 @@ TEST(ClientTest, testUpdateConnectionInfo) { // Access "private/auth" namespace in cluster 1 ServiceInfo info1{"pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt}; // Access "private/auth" namespace in cluster 2 - ServiceInfo info2{ - "pulsar+ssl://localhost:6653", - AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), - TEST_CONF_DIR "/hn-verification/cacert.pem"}; + ServiceInfo info2{"pulsar+ssl://localhost:6653", + AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), + TEST_CONF_DIR "/hn-verification/cacert.pem"}; // Access "public/default" namespace in cluster 1, which doesn't require authentication ServiceInfo info3{"pulsar://localhost:6650", std::nullopt, std::nullopt}; @@ -567,10 +566,9 @@ TEST(ClientTest, testUpdateConnectionInfo) { verify(client1, topicRequiredAuth, "msg-0"); client1.close(); - Client client2{info2.serviceUrl, - ClientConfiguration() - .setAuth(*info2.authentication) - .setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath)}; + Client client2{info2.serviceUrl, ClientConfiguration() + .setAuth(*info2.authentication) + .setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath)}; verify(client2, topicRequiredAuth, "msg-1"); client2.close(); From e70a0a421f2423b1a79c9e481c0fc1b6329b446e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 17:06:01 +0800 Subject: [PATCH 06/12] use read-write lock instead --- include/pulsar/Client.h | 2 +- lib/Client.cc | 2 +- lib/ClientImpl.cc | 114 ++++++++++++++++++++-------------------- lib/ClientImpl.h | 5 +- tests/ClientTest.cc | 2 +- 5 files changed, 63 insertions(+), 62 deletions(-) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 8fccbe19..f3ee2f66 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -436,7 +436,7 @@ class PULSAR_PUBLIC Client { * * @return the current service information */ - ServiceInfo getServiceInfo(); + ServiceInfo getServiceInfo() const; private: Client(const std::shared_ptr&); diff --git a/lib/Client.cc b/lib/Client.cc index 6ceaa117..45cf2170 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -200,6 +200,6 @@ void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, void Client::updateServiceInfo(const ServiceInfo& serviceInfo) { impl_->updateServiceInfo(serviceInfo); } -ServiceInfo Client::getServiceInfo() { return impl_->getServiceInfo(); } +ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); } } // namespace pulsar diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 2aceafea..f64908a5 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -24,7 +24,9 @@ #include #include #include +#include #include +#include #include #include "BinaryProtoLookupService.h" @@ -74,8 +76,6 @@ std::string generateRandomName() { return randomName; } -typedef std::unique_lock Lock; - typedef std::vector StringList; namespace { @@ -157,19 +157,26 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { } LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) { - Lock lock(mutex_); + std::shared_lock readLock(mutex_); if (redirectedClusterURI.empty()) { return lookupServicePtr_; } - auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); - if (it == redirectedClusterLookupServicePtrs_.end()) { - auto lookup = createLookup(redirectedClusterURI); - redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup); - return lookup; + if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); + it != redirectedClusterLookupServicePtrs_.end()) { + return it->second; } + readLock.unlock(); - return it->second; + std::unique_lock writeLock(mutex_); + // Double check in case another thread acquires the lock and inserts a pair first + if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); + it != redirectedClusterLookupServicePtrs_.end()) { + return it->second; + } + auto lookup = createLookup(redirectedClusterURI); + redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup); + return lookup; } void ClientImpl::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, @@ -179,7 +186,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Producer()); @@ -267,7 +274,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st const ReaderConfiguration& conf, const ReaderCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Reader()); @@ -289,7 +296,7 @@ void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewC const TableViewCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, TableView()); @@ -355,7 +362,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const const SubscribeCallback& callback) { TopicNamePtr topicNamePtr = TopicName::get(regexPattern); - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -441,7 +448,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto it = std::unique(topics.begin(), topics.end()); auto newSize = std::distance(topics.begin(), it); topics.resize(newSize); - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -477,7 +484,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub const ConsumerConfiguration& conf, const SubscribeCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -662,7 +669,7 @@ void ClientImpl::handleGetPartitions(Result result, const LookupDataResultPtr& p void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, StringList()); @@ -679,7 +686,9 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP } void ClientImpl::closeAsync(const CloseCallback& callback) { + std::unique_lock lock(mutex_); if (state_ != Open) { + lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -689,10 +698,12 @@ void ClientImpl::closeAsync(const CloseCallback& callback) { state_ = Closing; memoryLimitController_.close(); - getLookup()->close(); + lookupServicePtr_->close(); for (const auto& it : redirectedClusterLookupServicePtrs_) { it.second->close(); } + redirectedClusterLookupServicePtrs_.clear(); + lock.unlock(); auto producers = producers_.move(); auto consumers = consumers_.move(); @@ -741,7 +752,7 @@ void ClientImpl::handleClose(Result result, const SharedInt& numberOfOpenHandler --(*numberOfOpenHandlers); } if (*numberOfOpenHandlers == 0) { - Lock lock(mutex_); + std::unique_lock lock(mutex_); if (state_ == Closed) { LOG_DEBUG("Client is already shutting down, possible race condition in handleClose"); return; @@ -821,12 +832,12 @@ void ClientImpl::shutdown() { } uint64_t ClientImpl::newProducerId() { - Lock lock(mutex_); + std::shared_lock lock(mutex_); return producerIdGenerator_++; } uint64_t ClientImpl::newConsumerId() { - Lock lock(mutex_); + std::shared_lock lock(mutex_); return consumerIdGenerator_++; } @@ -870,51 +881,40 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati } void ClientImpl::updateServiceInfo(const ServiceInfo& serviceInfo) { - LookupServicePtr oldLookupServicePtr; - std::unordered_map oldRedirectedLookupServicePtrs; - - { - Lock lock(mutex_); - if (state_ != Open) { - LOG_ERROR("Client is not open, cannot update connection info"); - return; - } - - if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) { - clientConfiguration_.setAuth(*serviceInfo.authentication); - } else { - clientConfiguration_.setAuth(AuthFactory::Disabled()); - } - if (serviceInfo.tlsTrustCertsFilePath.has_value()) { - clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath); - } else { - clientConfiguration_.setTlsTrustCertsFilePath(""); - } - clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl))); - serviceInfo_ = {serviceInfo.serviceUrl, toOptionalAuthentication(clientConfiguration_.getAuthPtr()), - clientConfiguration_.getTlsTrustCertsFilePath().empty() - ? std::nullopt - : std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())}; - - oldLookupServicePtr = std::move(lookupServicePtr_); - oldRedirectedLookupServicePtrs = std::move(redirectedClusterLookupServicePtrs_); - - lookupServicePtr_ = createLookup(serviceInfo.serviceUrl); - redirectedClusterLookupServicePtrs_.clear(); + std::unique_lock lock(mutex_); + if (state_ != Open) { + LOG_ERROR("Client is not open, cannot update connection info"); + return; } - if (oldLookupServicePtr) { - oldLookupServicePtr->close(); + if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) { + clientConfiguration_.setAuth(*serviceInfo.authentication); + } else { + clientConfiguration_.setAuth(AuthFactory::Disabled()); } - for (const auto& it : oldRedirectedLookupServicePtrs) { - it.second->close(); + if (serviceInfo.tlsTrustCertsFilePath.has_value()) { + clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath); + } else { + clientConfiguration_.setTlsTrustCertsFilePath(""); } + clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl))); + serviceInfo_ = {serviceInfo.serviceUrl, toOptionalAuthentication(clientConfiguration_.getAuthPtr()), + clientConfiguration_.getTlsTrustCertsFilePath().empty() + ? std::nullopt + : std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())}; pool_.resetConnections(clientConfiguration_.getAuthPtr(), clientConfiguration_); + + lookupServicePtr_->close(); + for (auto&& it : redirectedClusterLookupServicePtrs_) { + it.second->close(); + } + redirectedClusterLookupServicePtrs_.clear(); + lookupServicePtr_ = createLookup(serviceInfo.serviceUrl); } -ServiceInfo ClientImpl::getServiceInfo() { - Lock lock(mutex_); +ServiceInfo ClientImpl::getServiceInfo() const { + std::shared_lock lock(mutex_); return serviceInfo_; } diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index c96f9aab..26c167f1 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "ConnectionPool.h" #include "Future.h" @@ -140,7 +141,7 @@ class ClientImpl : public std::enable_shared_from_this { uint64_t getLookupCount() { return lookupCount_; } void updateServiceInfo(const ServiceInfo& serviceInfo); - ServiceInfo getServiceInfo(); + ServiceInfo getServiceInfo() const; static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); @@ -191,7 +192,7 @@ class ClientImpl : public std::enable_shared_from_this { Closed }; - std::mutex mutex_; + mutable std::shared_mutex mutex_; State state_; ClientConfiguration clientConfiguration_; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 07128d19..31d0d93d 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -509,7 +509,7 @@ TEST(ClientTest, testNoRetry) { } } -TEST(ClientTest, testUpdateConnectionInfo) { +TEST(ClientTest, testUpdateServiceInfo) { extern std::string getToken(); // from AuthToken.cc // Access "private/auth" namespace in cluster 1 From 342505e7dfaf462b0960a2308b93c1466285e814 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 17:19:04 +0800 Subject: [PATCH 07/12] pass by value + std::move for better performance --- include/pulsar/Client.h | 2 +- lib/Client.cc | 2 +- lib/ClientImpl.cc | 2 +- lib/ClientImpl.h | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index f3ee2f66..15dd0324 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -429,7 +429,7 @@ class PULSAR_PUBLIC Client { * * @param serviceInfo the service URL, authentication and TLS trust certs file path to use */ - void updateServiceInfo(const ServiceInfo& serviceInfo); + void updateServiceInfo(ServiceInfo serviceInfo); /** * Get the current service information of the client. diff --git a/lib/Client.cc b/lib/Client.cc index 45cf2170..311d2ddd 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -198,7 +198,7 @@ void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, .addListener(std::move(callback)); } -void Client::updateServiceInfo(const ServiceInfo& serviceInfo) { impl_->updateServiceInfo(serviceInfo); } +void Client::updateServiceInfo(ServiceInfo serviceInfo) { impl_->updateServiceInfo(std::move(serviceInfo)); } ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); } diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index f64908a5..7f2ee6c7 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -880,7 +880,7 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati return clientConfiguration.impl_->operationTimeout; } -void ClientImpl::updateServiceInfo(const ServiceInfo& serviceInfo) { +void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) { std::unique_lock lock(mutex_); if (state_ != Open) { LOG_ERROR("Client is not open, cannot update connection info"); diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 26c167f1..e58d6df6 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -140,7 +140,7 @@ class ClientImpl : public std::enable_shared_from_this { ConnectionPool& getConnectionPool() noexcept { return pool_; } uint64_t getLookupCount() { return lookupCount_; } - void updateServiceInfo(const ServiceInfo& serviceInfo); + void updateServiceInfo(ServiceInfo&& serviceInfo); ServiceInfo getServiceInfo() const; static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); From 477fb9495e3b496f56ea94ca2e4497bb2d01ec75 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 17:21:08 +0800 Subject: [PATCH 08/12] fix thread safety due to read lock for write operation --- lib/ClientImpl.cc | 10 ++-------- lib/ClientImpl.h | 4 ++-- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 7f2ee6c7..5b71a696 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -831,15 +831,9 @@ void ClientImpl::shutdown() { lookupCount_ = 0; } -uint64_t ClientImpl::newProducerId() { - std::shared_lock lock(mutex_); - return producerIdGenerator_++; -} +uint64_t ClientImpl::newProducerId() { return producerIdGenerator_++; } -uint64_t ClientImpl::newConsumerId() { - std::shared_lock lock(mutex_); - return consumerIdGenerator_++; -} +uint64_t ClientImpl::newConsumerId() { return consumerIdGenerator_++; } uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; } diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index e58d6df6..11881a2f 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -207,8 +207,8 @@ class ClientImpl : public std::enable_shared_from_this { std::unordered_map redirectedClusterLookupServicePtrs_; ConnectionPool pool_; - uint64_t producerIdGenerator_; - uint64_t consumerIdGenerator_; + std::atomic_uint64_t producerIdGenerator_; + std::atomic_uint64_t consumerIdGenerator_; std::shared_ptr> requestIdGenerator_{std::make_shared>(0)}; SynchronizedHashMap producers_; From 9114edf05e322f1fcc02dc6b6ff42ee1ce613782 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 17:43:58 +0800 Subject: [PATCH 09/12] fix stale lookup service might be refered --- lib/Client.cc | 3 +-- lib/ClientImpl.cc | 37 +++++++++++++-------------- lib/ClientImpl.h | 22 +++++++++++++--- lib/MultiTopicsConsumerImpl.cc | 22 ++++++++++------ lib/MultiTopicsConsumerImpl.h | 7 +---- lib/PartitionedProducerImpl.cc | 8 +++--- lib/PartitionedProducerImpl.h | 3 --- lib/PatternMultiTopicsConsumerImpl.cc | 13 +++++----- lib/PatternMultiTopicsConsumerImpl.h | 1 - lib/ReaderImpl.cc | 1 - tests/LookupServiceTest.cc | 17 +++--------- 11 files changed, 69 insertions(+), 65 deletions(-) diff --git a/lib/Client.cc b/lib/Client.cc index 311d2ddd..ba829b2b 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -193,8 +193,7 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, std::function callback) { - impl_->getLookup() - ->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") + impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") .addListener(std::move(callback)); } diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 5b71a696..02f017b0 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -200,8 +200,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon if (autoDownloadSchema) { auto self = shared_from_this(); - auto lookup = getLookup(); - lookup->getSchema(topicName).addListener( + getSchema(topicName).addListener( [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { if (res != ResultOk) { callback(res, Producer()); @@ -209,12 +208,12 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } ProducerConfiguration conf; conf.setSchema(topicSchema); - self->getLookup()->getPartitionMetadataAsync(topicName).addListener( + self->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); }); } else { - getLookup()->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); } @@ -287,7 +286,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st } MessageId msgId(startMessageId); - getLookup()->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, msgId, conf, callback)); } @@ -400,8 +399,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const return; } - getLookup() - ->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) + getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, regexPattern, mode, subscriptionName, conf, callback)); @@ -423,9 +421,8 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace auto interceptors = std::make_shared(conf.getInterceptors()); - consumer = std::make_shared(shared_from_this(), regexPattern, mode, - *matchTopics, subscriptionName, conf, - getLookup(), interceptors); + consumer = std::make_shared( + shared_from_this(), regexPattern, mode, *matchTopics, subscriptionName, conf, interceptors); consumer->getConsumerCreatedFuture().addListener( std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -472,7 +469,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto interceptors = std::make_shared(conf.getInterceptors()); ConsumerImplBasePtr consumer = std::make_shared( - shared_from_this(), topics, subscriptionName, topicNamePtr, conf, getLookup(), interceptors); + shared_from_this(), topics, subscriptionName, topicNamePtr, conf, interceptors); consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -502,7 +499,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub } } - getLookup()->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, subscriptionName, conf, callback)); } @@ -525,9 +522,9 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti callback(ResultInvalidConfiguration, Consumer()); return; } - consumer = std::make_shared( - shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf, - getLookup(), interceptors); + consumer = std::make_shared(shared_from_this(), topicName, + partitionMetadata->getPartitions(), + subscriptionName, conf, interceptors); } else { auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), subscriptionName, conf, @@ -680,9 +677,9 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP return; } } - getLookup()->getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, callback)); + getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions, + shared_from_this(), std::placeholders::_1, + std::placeholders::_2, topicName, callback)); } void ClientImpl::closeAsync(const CloseCallback& callback) { @@ -802,7 +799,9 @@ void ClientImpl::shutdown() { << " consumers have been shutdown."); } - getLookup()->close(); + std::shared_lock lock(mutex_); + lookupServicePtr_->close(); + lock.unlock(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 11881a2f..2a6a8298 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -29,6 +29,7 @@ #include "ConnectionPool.h" #include "Future.h" #include "LookupDataResult.h" +#include "LookupService.h" #include "MemoryLimitController.h" #include "ProtoApiEnums.h" #include "SynchronizedHashMap.h" @@ -53,8 +54,6 @@ typedef std::weak_ptr ConsumerImplBaseWeakPtr; class ClientConnection; using ClientConnectionPtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; using LookupServiceFactory = std::function; @@ -129,7 +128,6 @@ class ClientImpl : public std::enable_shared_from_this { ExecutorServiceProviderPtr getIOExecutorProvider(); ExecutorServiceProviderPtr getListenerExecutorProvider(); ExecutorServiceProviderPtr getPartitionListenerExecutorProvider(); - LookupServicePtr getLookup(const std::string& redirectedClusterURI = ""); void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); } @@ -143,6 +141,23 @@ class ClientImpl : public std::enable_shared_from_this { void updateServiceInfo(ServiceInfo&& serviceInfo); ServiceInfo getServiceInfo() const; + // Since the underlying `lookupServicePtr_` can be modified by `updateServiceInfo`, we should not expose + // it to other classes, otherwise the update might not be visible. + auto getPartitionMetadataAsync(const TopicNamePtr& topicName) { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getPartitionMetadataAsync(topicName); + } + + auto getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode); + } + + auto getSchema(const TopicNamePtr& topicName, const std::string& version = "") { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getSchema(topicName, version); + } + static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); friend class PulsarFriend; @@ -182,6 +197,7 @@ class ClientImpl : public std::enable_shared_from_this { const std::string& logicalAddress); LookupServicePtr createLookup(const std::string& serviceUrl); + LookupServicePtr getLookup(const std::string& redirectedClusterURI); static std::string getClientVersion(const ClientConfiguration& clientConfiguration); diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 0799eb63..a5699781 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -44,20 +44,19 @@ using std::chrono::seconds; MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode, const optional& startMessageId) : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf, - lookupServicePtr, interceptors, subscriptionMode, startMessageId) { + interceptors, subscriptionMode, startMessageId) { topicsPartitions_[topicName->toString()] = numPartitions; } MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, - Commands::SubscriptionMode subscriptionMode, const optional& startMessageId) + const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode, + const optional& startMessageId) : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics", Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf, client->getListenerExecutorProvider()->get()), @@ -66,7 +65,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( conf_(conf), incomingMessages_(conf.getReceiverQueueSize()), messageListener_(conf.getMessageListener()), - lookupServicePtr_(lookupServicePtr), numberTopicPartitions_(std::make_shared>(0)), topics_(topics), subscriptionMode_(subscriptionMode), @@ -93,7 +91,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( if (partitionsUpdateInterval > 0) { partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); partitionsUpdateInterval_ = seconds(partitionsUpdateInterval); - lookupServicePtr_ = client->getLookup(); } state_ = Pending; @@ -185,7 +182,12 @@ Future MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s auto entry = topicsPartitions_.find(topic); if (entry == topicsPartitions_.end()) { lock.unlock(); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + auto client = client_.lock(); + if (!client) { + topicPromise->setFailed(ResultAlreadyClosed); + return topicPromise->getFuture(); + } + client->getPartitionMetadataAsync(topicName).addListener( [this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) { if (result != ResultOk) { LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- " @@ -1003,7 +1005,11 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() { auto topicName = TopicName::get(item.first); auto currentNumPartitions = item.second; auto weakSelf = weak_from_this(); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + auto client = client_.lock(); + if (!client) { + return; + } + client->getPartitionMetadataAsync(topicName).addListener( [this, weakSelf, topicName, currentNumPartitions](Result result, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index dc628652..38a44cdf 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -46,23 +46,19 @@ class MultiTopicsBrokerConsumerStatsImpl; using MultiTopicsBrokerConsumerStatsPtr = std::shared_ptr; class UnAckedMessageTrackerInterface; using UnAckedMessageTrackerPtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; class MultiTopicsConsumerImpl; class MultiTopicsConsumerImpl : public ConsumerImplBase { public: MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, const optional& startMessageId = optional{}); MultiTopicsConsumerImpl(const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, - const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_, - const ConsumerInterceptorsPtr& interceptors, + const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, const optional& startMessageId = optional{}); @@ -119,7 +115,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { MessageListener messageListener_; DeadlineTimerPtr partitionsUpdateTimer_; TimeDuration partitionsUpdateInterval_; - LookupServicePtr lookupServicePtr_; std::shared_ptr> numberTopicPartitions_; std::atomic failedResult{ResultOk}; Promise multiTopicsConsumerCreatedPromise_; diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 4a923666..1aa5c87b 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -23,7 +23,6 @@ #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" -#include "LookupService.h" #include "ProducerImpl.h" #include "RoundRobinMessageRouter.h" #include "SinglePartitionMessageRouter.h" @@ -59,7 +58,6 @@ PartitionedProducerImpl::PartitionedProducerImpl(const ClientImplPtr& client, co listenerExecutor_ = client->getListenerExecutorProvider()->get(); partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); partitionsUpdateInterval_ = std::chrono::seconds(partitionsUpdateInterval); - lookupServicePtr_ = client->getLookup(); } } @@ -433,7 +431,11 @@ void PartitionedProducerImpl::runPartitionUpdateTask() { void PartitionedProducerImpl::getPartitionMetadata() { using namespace std::placeholders; auto weakSelf = weak_from_this(); - lookupServicePtr_->getPartitionMetadataAsync(topicName_) + auto client = client_.lock(); + if (!client) { + return; + } + client->getPartitionMetadataAsync(topicName_) .addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); if (self) { diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 40f2d34d..94ba7179 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -38,8 +38,6 @@ using ClientImplPtr = std::shared_ptr; using ClientImplWeakPtr = std::weak_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; class ProducerImpl; using ProducerImplPtr = std::shared_ptr; class TopicName; @@ -133,7 +131,6 @@ class PartitionedProducerImpl : public ProducerImplBase, ExecutorServicePtr listenerExecutor_; DeadlineTimerPtr partitionsUpdateTimer_; TimeDuration partitionsUpdateInterval_; - LookupServicePtr lookupServicePtr_; ProducerInterceptorsPtr interceptors_; diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index fd48feed..4b5aab73 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -21,7 +21,6 @@ #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" -#include "LookupService.h" DECLARE_LOG_OBJECT() @@ -32,10 +31,8 @@ using std::chrono::seconds; PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl( const ClientImplPtr& client, const std::string& pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, - const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_, - const ConsumerInterceptorsPtr& interceptors) - : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, - lookupServicePtr_, interceptors), + const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors) + : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, interceptors), patternString_(pattern), pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))), getTopicsMode_(getTopicsMode), @@ -84,7 +81,11 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& er // already get namespace from pattern. assert(namespaceName_); - lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_) + auto client = client_.lock(); + if (!client) { + return; + } + client->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_) .addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this, std::placeholders::_1, std::placeholders::_2)); } diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h index 63527965..796abcc2 100644 --- a/lib/PatternMultiTopicsConsumerImpl.h +++ b/lib/PatternMultiTopicsConsumerImpl.h @@ -52,7 +52,6 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr_, const ConsumerInterceptorsPtr& interceptors); ~PatternMultiTopicsConsumerImpl() override; diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index 7fa7e8b9..754137c5 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -90,7 +90,6 @@ void ReaderImpl::start(const MessageId& startMessageId, if (partitions_ > 0) { auto consumerImpl = std::make_shared( client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf, - client_.lock()->getLookup(), std::make_shared(std::vector()), Commands::SubscriptionModeNonDurable, startMessageId); consumer_ = consumerImpl; diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 92aa8204..dc9cfc4e 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -259,7 +259,7 @@ TEST_P(LookupServiceTest, basicGetNamespaceTopics) { ASSERT_EQ(ResultOk, result); // 2. verify getTopicsOfNamespace by regex mode. - auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_)->getLookup(); + auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_); auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode, const std::set& expectedTopics) { Future getTopicsFuture = @@ -292,11 +292,8 @@ TEST_P(LookupServiceTest, testGetSchema) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); ASSERT_EQ(jsonSchema, schemaInfo.getSchema()); ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType()); @@ -310,11 +307,8 @@ TEST_P(LookupServiceTest, testGetSchemaNotFound) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo)); } @@ -335,11 +329,8 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema()); ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType()); From c6de067e3795b58eed558c03795e385fc8ee16a5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 21:02:09 +0800 Subject: [PATCH 10/12] fix the reader cannot consume after switching to a new cluster --- lib/ClientConnection.cc | 5 ++++- lib/ClientConnection.h | 3 ++- lib/ClientImpl.cc | 2 +- lib/ConnectionPool.cc | 6 +++--- lib/ConnectionPool.h | 2 +- lib/ConsumerImpl.cc | 13 ++++++++++++- lib/ConsumerImpl.h | 7 +++++++ tests/ClientTest.cc | 25 ++++++++++++++++++++----- 8 files changed, 50 insertions(+), 13 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 0a850ed0..41d2f608 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1292,7 +1292,7 @@ void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, startConsumerStatsTimer(consumerStatsRequests); } -void ClientConnection::close(Result result, bool detach) { +void ClientConnection::close(Result result, bool detach, bool switchCluster) { Lock lock(mutex_); if (isClosed()) { return; @@ -1368,6 +1368,9 @@ void ClientConnection::close(Result result, bool detach) { for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) { auto consumer = it->second.lock(); if (consumer) { + if (switchCluster) { + consumer->onClusterSwitching(); + } consumer->handleDisconnection(result, self); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index b2770006..39e4224d 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -157,10 +157,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_thisclose(); for (auto&& it : redirectedClusterLookupServicePtrs_) { diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index d94d8fe8..eaa14dc7 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -67,8 +67,8 @@ bool ConnectionPool::close() { return true; } -void ConnectionPool::resetConnections(const AuthenticationPtr& authentication, - const ClientConfiguration& conf) { +void ConnectionPool::resetForClusterSwitching(const AuthenticationPtr& authentication, + const ClientConfiguration& conf) { std::unique_lock lock(mutex_); authentication_ = authentication; clientConfiguration_ = conf; @@ -76,7 +76,7 @@ void ConnectionPool::resetConnections(const AuthenticationPtr& authentication, for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { auto& cnx = cnxIt->second; if (cnx) { - cnx->close(ResultDisconnected, false); + cnx->close(ResultDisconnected, false, true); } } pool_.clear(); diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h index 64907b4f..541cf8ed 100644 --- a/lib/ConnectionPool.h +++ b/lib/ConnectionPool.h @@ -55,7 +55,7 @@ class PULSAR_PUBLIC ConnectionPool { * Close all existing connections and update the authentication and configuration. * Unlike close(), the pool remains open for new connections. */ - void resetConnections(const AuthenticationPtr& authentication, const ClientConfiguration& conf); + void resetForClusterSwitching(const AuthenticationPtr& authentication, const ClientConfiguration& conf); void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix, ClientConnection* value); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 757b6e84..785a8bd7 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -125,7 +125,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic negativeAcksTracker_(std::make_shared(client, *this, conf)), ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)), readCompacted_(conf.isReadCompacted()), - startMessageId_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageIdFromConfig_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageId_(startMessageIdFromConfig_), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), @@ -1134,6 +1135,16 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { } } +void ConsumerImpl::onClusterSwitching() { + { + LockGuard lock{mutex_}; + incomingMessages_.clear(); + startMessageId_ = startMessageIdFromConfig_; + lastDequedMessageId_ = MessageId::earliest(); + } + ackGroupingTrackerPtr_->flushAndClean(); +} + /** * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that * was diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 0da82a2d..6f287aa2 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -162,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase { void doImmediateAck(const MessageId& msgId, const ResultCallback& callback, CommandAck_AckType ackType); void doImmediateAck(const std::set& msgIds, const ResultCallback& callback); + void onClusterSwitching(); + protected: // overrided methods from HandlerBase Future connectionOpened(const ClientConnectionPtr& cnx) override; @@ -266,6 +268,11 @@ class ConsumerImpl : public ConsumerImplBase { MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; + + // When the consumer switches to a new cluster, we should reset `startMessageId_` to the original value, + // otherwise, the message id of the old cluster might be passed in the Subscribe request on the new + // cluster. + const optional startMessageIdFromConfig_; optional startMessageId_; SeekStatus seekStatus_{SeekStatus::NOT_STARTED}; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 31d0d93d..93ec3a11 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -522,11 +522,26 @@ TEST(ClientTest, testUpdateServiceInfo) { ServiceInfo info3{"pulsar://localhost:6650", std::nullopt, std::nullopt}; Client client{info1.serviceUrl, ClientConfiguration().setAuth(*info1.authentication)}; + const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr)); Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer)); - MessageId msgId; - ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-0").build(), msgId)); + + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topicRequiredAuth, MessageId::earliest(), {}, reader)); + + auto sendAndReceive = [&](const std::string &value) { + MessageId msgId; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(value).build(), msgId)); + LOG_INFO("Sent " << value << " to " << msgId); + + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); + LOG_INFO("Read " << msg.getDataAsString() << " from " << msgId); + ASSERT_EQ(value, msg.getDataAsString()); + }; + + sendAndReceive("msg-0"); // Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh) ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); @@ -537,8 +552,7 @@ TEST(ClientTest, testUpdateServiceInfo) { ASSERT_EQ(info2.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); // Now the same will access the same topic in cluster 2 - ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-1").build(), msgId)); - ASSERT_EQ(ResultOk, producer.close()); + sendAndReceive("msg-1"); // Switch back to cluster 1 without any authentication, the previous authentication info configured for // cluster 2 will be cleared. @@ -548,8 +562,9 @@ TEST(ClientTest, testUpdateServiceInfo) { ASSERT_EQ(info3.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + producer.close(); ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer)); - ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build(), msgId)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build())); client.close(); From a209112d912af546a0ee582be6e073febd1549df Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 23:03:18 +0800 Subject: [PATCH 11/12] address comments from copilot --- include/pulsar/Client.h | 8 +---- include/pulsar/ServiceInfo.h | 36 +++++++++++++++++++++ lib/ClientConfiguration.cc | 14 ++++----- lib/ClientConfigurationImpl.h | 59 +++++++++++++++++++++++++++++++++-- lib/ClientImpl.cc | 20 +++++------- lib/ConsumerImpl.cc | 3 ++ tests/ClientTest.cc | 2 +- 7 files changed, 112 insertions(+), 30 deletions(-) create mode 100644 include/pulsar/ServiceInfo.h diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 15dd0324..7e2d10d0 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -29,10 +29,10 @@ #include #include #include +#include #include #include -#include #include namespace pulsar { @@ -43,12 +43,6 @@ typedef std::function TableViewCallback; typedef std::function&)> GetPartitionsCallback; typedef std::function CloseCallback; -struct PULSAR_PUBLIC ServiceInfo { - std::string serviceUrl; - std::optional authentication; - std::optional tlsTrustCertsFilePath; -}; - class ClientImpl; class PulsarFriend; class PulsarWrapper; diff --git a/include/pulsar/ServiceInfo.h b/include/pulsar/ServiceInfo.h new file mode 100644 index 00000000..a0f437bc --- /dev/null +++ b/include/pulsar/ServiceInfo.h @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_SERVICE_INFO_H_ +#define PULSAR_SERVICE_INFO_H_ + +#include + +#include +#include + +namespace pulsar { + +struct PULSAR_PUBLIC ServiceInfo { + std::string serviceUrl; + std::optional authentication; + std::optional tlsTrustCertsFilePath; +}; + +} // namespace pulsar +#endif diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc index b99c5d25..c3c56b95 100644 --- a/lib/ClientConfiguration.cc +++ b/lib/ClientConfiguration.cc @@ -53,13 +53,13 @@ ClientConfiguration& ClientConfiguration::setConnectionsPerBroker(int connection int ClientConfiguration::getConnectionsPerBroker() const { return impl_->connectionsPerBroker; } ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authentication) { - impl_->authenticationPtr = authentication; + impl_->setAuthentication(authentication); return *this; } -Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; } +Authentication& ClientConfiguration::getAuth() const { return *impl_->getAuthentication(); } -const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; } +const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->getAuthentication(); } ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int timeout) { impl_->operationTimeout = std::chrono::seconds(timeout); @@ -95,11 +95,11 @@ ClientConfiguration& ClientConfiguration::setMessageListenerThreads(int threads) int ClientConfiguration::getMessageListenerThreads() const { return impl_->messageListenerThreads; } ClientConfiguration& ClientConfiguration::setUseTls(bool useTls) { - impl_->useTls = useTls; + impl_->setUseTls(useTls); return *this; } -bool ClientConfiguration::isUseTls() const { return impl_->useTls; } +bool ClientConfiguration::isUseTls() const { return impl_->isUseTls(); } ClientConfiguration& ClientConfiguration::setValidateHostName(bool validateHostName) { impl_->validateHostName = validateHostName; @@ -127,12 +127,12 @@ const std::string& ClientConfiguration::getTlsCertificateFilePath() const { } ClientConfiguration& ClientConfiguration::setTlsTrustCertsFilePath(const std::string& filePath) { - impl_->tlsTrustCertsFilePath = filePath; + impl_->setTlsTrustCertsFilePath(filePath); return *this; } const std::string& ClientConfiguration::getTlsTrustCertsFilePath() const { - return impl_->tlsTrustCertsFilePath; + return impl_->getTlsTrustCertsFilePath(); } ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool allowInsecure) { diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h index e7a83a19..6207f989 100644 --- a/lib/ClientConfigurationImpl.h +++ b/lib/ClientConfigurationImpl.h @@ -20,13 +20,70 @@ #define LIB_CLIENTCONFIGURATIONIMPL_H_ #include +#include #include +#include + +#include "ServiceNameResolver.h" +#include "ServiceURI.h" namespace pulsar { +// Use struct rather than class here just for ABI compatibility struct ClientConfigurationImpl { + private: + mutable std::shared_mutex mutex; AuthenticationPtr authenticationPtr{AuthFactory::Disabled()}; + std::string tlsTrustCertsFilePath; + bool useTls{false}; + + public: + void updateServiceInfo(const ServiceInfo& serviceInfo) { + std::unique_lock lock(mutex); + if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) { + authenticationPtr = *serviceInfo.authentication; + } else { + authenticationPtr = AuthFactory::Disabled(); + } + if (serviceInfo.tlsTrustCertsFilePath.has_value()) { + tlsTrustCertsFilePath = *serviceInfo.tlsTrustCertsFilePath; + } else { + tlsTrustCertsFilePath = ""; + } + useTls = ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl)); + } + + auto& getAuthentication() const { + std::shared_lock lock(mutex); + return authenticationPtr; + } + + auto setAuthentication(const AuthenticationPtr& authentication) { + std::unique_lock lock(mutex); + authenticationPtr = authentication; + } + + auto& getTlsTrustCertsFilePath() const { + std::shared_lock lock(mutex); + return tlsTrustCertsFilePath; + } + + auto setTlsTrustCertsFilePath(const std::string& path) { + std::unique_lock lock(mutex); + tlsTrustCertsFilePath = path; + } + + auto isUseTls() const { + std::shared_lock lock(mutex); + return useTls; + } + + auto setUseTls(bool useTls_) { + std::unique_lock lock(mutex); + useTls = useTls_; + } + uint64_t memoryLimit{0ull}; int ioThreads{1}; int connectionsPerBroker{1}; @@ -36,10 +93,8 @@ struct ClientConfigurationImpl { int maxLookupRedirects{20}; int initialBackoffIntervalMs{100}; int maxBackoffIntervalMs{60000}; - bool useTls{false}; std::string tlsPrivateKeyFilePath; std::string tlsCertificateFilePath; - std::string tlsTrustCertsFilePath; bool tlsAllowInsecureConnection{false}; unsigned int statsIntervalInSeconds{600}; // 10 minutes std::unique_ptr loggerFactory; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 385a9627..91edf43d 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -874,27 +874,17 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati } void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) { - std::unique_lock lock(mutex_); + std::unique_lock lock{mutex_}; if (state_ != Open) { - LOG_ERROR("Client is not open, cannot update connection info"); + LOG_ERROR("Client is not open, cannot update service info"); return; } - if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) { - clientConfiguration_.setAuth(*serviceInfo.authentication); - } else { - clientConfiguration_.setAuth(AuthFactory::Disabled()); - } - if (serviceInfo.tlsTrustCertsFilePath.has_value()) { - clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath); - } else { - clientConfiguration_.setTlsTrustCertsFilePath(""); - } - clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl))); serviceInfo_ = {serviceInfo.serviceUrl, toOptionalAuthentication(clientConfiguration_.getAuthPtr()), clientConfiguration_.getTlsTrustCertsFilePath().empty() ? std::nullopt : std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())}; + clientConfiguration_.impl_->updateServiceInfo(serviceInfo_); pool_.resetForClusterSwitching(clientConfiguration_.getAuthPtr(), clientConfiguration_); @@ -904,6 +894,10 @@ void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) { } redirectedClusterLookupServicePtrs_.clear(); lookupServicePtr_ = createLookup(serviceInfo.serviceUrl); + + // TODO: changes on the following two fields are not tested + useProxy_ = false; + lookupCount_ = 0; } ServiceInfo ClientImpl::getServiceInfo() const { diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 785a8bd7..71cf1d01 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1141,6 +1141,9 @@ void ConsumerImpl::onClusterSwitching() { incomingMessages_.clear(); startMessageId_ = startMessageIdFromConfig_; lastDequedMessageId_ = MessageId::earliest(); + lastMessageIdInBroker_ = MessageId::earliest(); + seekStatus_ = SeekStatus::NOT_STARTED; + lastSeekArg_.reset(); } ackGroupingTrackerPtr_->flushAndClean(); } diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 93ec3a11..f6a30154 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -510,7 +510,7 @@ TEST(ClientTest, testNoRetry) { } TEST(ClientTest, testUpdateServiceInfo) { - extern std::string getToken(); // from AuthToken.cc + extern std::string getToken(); // from tests/AuthToken.cc // Access "private/auth" namespace in cluster 1 ServiceInfo info1{"pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt}; From fc04394d467b8a9224addd47b8497b0c32d1ff60 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Mar 2026 23:07:48 +0800 Subject: [PATCH 12/12] fix libstdc++ header include error --- lib/ClientConfigurationImpl.h | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h index 6207f989..89f82129 100644 --- a/lib/ClientConfigurationImpl.h +++ b/lib/ClientConfigurationImpl.h @@ -23,6 +23,7 @@ #include #include +#include #include #include "ServiceNameResolver.h"