diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 56130667..7e2d10d0 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -414,6 +415,23 @@ class PULSAR_PUBLIC Client { void getSchemaInfoAsync(const std::string& topic, int64_t version, std::function callback); + /** + * Update the service information of the client. + * + * This method is used to switch the connection to a different Pulsar cluster. All connections will be + * closed and the internal service info will be updated. + * + * @param serviceInfo the service URL, authentication and TLS trust certs file path to use + */ + void updateServiceInfo(ServiceInfo serviceInfo); + + /** + * Get the current service information of the client. + * + * @return the current service information + */ + ServiceInfo getServiceInfo() const; + private: Client(const std::shared_ptr&); diff --git a/include/pulsar/ServiceInfo.h b/include/pulsar/ServiceInfo.h new file mode 100644 index 00000000..a0f437bc --- /dev/null +++ b/include/pulsar/ServiceInfo.h @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_SERVICE_INFO_H_ +#define PULSAR_SERVICE_INFO_H_ + +#include + +#include +#include + +namespace pulsar { + +struct PULSAR_PUBLIC ServiceInfo { + std::string serviceUrl; + std::optional authentication; + std::optional tlsTrustCertsFilePath; +}; + +} // namespace pulsar +#endif diff --git a/lib/Client.cc b/lib/Client.cc index 39a5948a..ba829b2b 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -193,8 +193,12 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, std::function callback) { - impl_->getLookup() - ->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") + impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") .addListener(std::move(callback)); } + +void Client::updateServiceInfo(ServiceInfo serviceInfo) { impl_->updateServiceInfo(std::move(serviceInfo)); } + +ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); } + } // namespace pulsar diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc index b99c5d25..c3c56b95 100644 --- a/lib/ClientConfiguration.cc +++ b/lib/ClientConfiguration.cc @@ -53,13 +53,13 @@ ClientConfiguration& ClientConfiguration::setConnectionsPerBroker(int connection int ClientConfiguration::getConnectionsPerBroker() const { return impl_->connectionsPerBroker; } ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authentication) { - impl_->authenticationPtr = authentication; + impl_->setAuthentication(authentication); return *this; } -Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; } +Authentication& ClientConfiguration::getAuth() const { return *impl_->getAuthentication(); } -const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; } +const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->getAuthentication(); } ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int timeout) { impl_->operationTimeout = std::chrono::seconds(timeout); @@ -95,11 +95,11 @@ ClientConfiguration& ClientConfiguration::setMessageListenerThreads(int threads) int ClientConfiguration::getMessageListenerThreads() const { return impl_->messageListenerThreads; } ClientConfiguration& ClientConfiguration::setUseTls(bool useTls) { - impl_->useTls = useTls; + impl_->setUseTls(useTls); return *this; } -bool ClientConfiguration::isUseTls() const { return impl_->useTls; } +bool ClientConfiguration::isUseTls() const { return impl_->isUseTls(); } ClientConfiguration& ClientConfiguration::setValidateHostName(bool validateHostName) { impl_->validateHostName = validateHostName; @@ -127,12 +127,12 @@ const std::string& ClientConfiguration::getTlsCertificateFilePath() const { } ClientConfiguration& ClientConfiguration::setTlsTrustCertsFilePath(const std::string& filePath) { - impl_->tlsTrustCertsFilePath = filePath; + impl_->setTlsTrustCertsFilePath(filePath); return *this; } const std::string& ClientConfiguration::getTlsTrustCertsFilePath() const { - return impl_->tlsTrustCertsFilePath; + return impl_->getTlsTrustCertsFilePath(); } ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool allowInsecure) { diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h index e7a83a19..89f82129 100644 --- a/lib/ClientConfigurationImpl.h +++ b/lib/ClientConfigurationImpl.h @@ -20,13 +20,71 @@ #define LIB_CLIENTCONFIGURATIONIMPL_H_ #include +#include #include +#include +#include + +#include "ServiceNameResolver.h" +#include "ServiceURI.h" namespace pulsar { +// Use struct rather than class here just for ABI compatibility struct ClientConfigurationImpl { + private: + mutable std::shared_mutex mutex; AuthenticationPtr authenticationPtr{AuthFactory::Disabled()}; + std::string tlsTrustCertsFilePath; + bool useTls{false}; + + public: + void updateServiceInfo(const ServiceInfo& serviceInfo) { + std::unique_lock lock(mutex); + if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) { + authenticationPtr = *serviceInfo.authentication; + } else { + authenticationPtr = AuthFactory::Disabled(); + } + if (serviceInfo.tlsTrustCertsFilePath.has_value()) { + tlsTrustCertsFilePath = *serviceInfo.tlsTrustCertsFilePath; + } else { + tlsTrustCertsFilePath = ""; + } + useTls = ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl)); + } + + auto& getAuthentication() const { + std::shared_lock lock(mutex); + return authenticationPtr; + } + + auto setAuthentication(const AuthenticationPtr& authentication) { + std::unique_lock lock(mutex); + authenticationPtr = authentication; + } + + auto& getTlsTrustCertsFilePath() const { + std::shared_lock lock(mutex); + return tlsTrustCertsFilePath; + } + + auto setTlsTrustCertsFilePath(const std::string& path) { + std::unique_lock lock(mutex); + tlsTrustCertsFilePath = path; + } + + auto isUseTls() const { + std::shared_lock lock(mutex); + return useTls; + } + + auto setUseTls(bool useTls_) { + std::unique_lock lock(mutex); + useTls = useTls_; + } + uint64_t memoryLimit{0ull}; int ioThreads{1}; int connectionsPerBroker{1}; @@ -36,10 +94,8 @@ struct ClientConfigurationImpl { int maxLookupRedirects{20}; int initialBackoffIntervalMs{100}; int maxBackoffIntervalMs{60000}; - bool useTls{false}; std::string tlsPrivateKeyFilePath; std::string tlsCertificateFilePath; - std::string tlsTrustCertsFilePath; bool tlsAllowInsecureConnection{false}; unsigned int statsIntervalInSeconds{600}; // 10 minutes std::unique_ptr loggerFactory; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 0a850ed0..41d2f608 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1292,7 +1292,7 @@ void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, startConsumerStatsTimer(consumerStatsRequests); } -void ClientConnection::close(Result result, bool detach) { +void ClientConnection::close(Result result, bool detach, bool switchCluster) { Lock lock(mutex_); if (isClosed()) { return; @@ -1368,6 +1368,9 @@ void ClientConnection::close(Result result, bool detach) { for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) { auto consumer = it->second.lock(); if (consumer) { + if (switchCluster) { + consumer->onClusterSwitching(); + } consumer->handleDisconnection(result, self); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index b2770006..39e4224d 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -157,10 +157,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this #include #include +#include #include +#include #include #include "BinaryProtoLookupService.h" @@ -74,10 +76,17 @@ std::string generateRandomName() { return randomName; } -typedef std::unique_lock Lock; - typedef std::vector StringList; +namespace { +std::optional toOptionalAuthentication(const AuthenticationPtr& authentication) { + if (!authentication || authentication->getAuthMethodName() == "none") { + return std::nullopt; + } + return authentication; +} +} // namespace + static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, ConnectionPool& pool, const AuthenticationPtr& auth) { @@ -101,6 +110,10 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& state_(Open), clientConfiguration_(ClientConfiguration(clientConfiguration) .setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))), + serviceInfo_{serviceUrl, toOptionalAuthentication(clientConfiguration.getAuthPtr()), + clientConfiguration.getTlsTrustCertsFilePath().empty() + ? std::nullopt + : std::make_optional(clientConfiguration.getTlsTrustCertsFilePath())}, memoryLimitController_(clientConfiguration.getMemoryLimit()), ioExecutorProvider_(std::make_shared(clientConfiguration_.getIOThreads())), listenerExecutorProvider_( @@ -144,19 +157,26 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { } LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) { + std::shared_lock readLock(mutex_); if (redirectedClusterURI.empty()) { return lookupServicePtr_; } - Lock lock(mutex_); - auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); - if (it == redirectedClusterLookupServicePtrs_.end()) { - auto lookup = createLookup(redirectedClusterURI); - redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup); - return lookup; + if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); + it != redirectedClusterLookupServicePtrs_.end()) { + return it->second; } + readLock.unlock(); - return it->second; + std::unique_lock writeLock(mutex_); + // Double check in case another thread acquires the lock and inserts a pair first + if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); + it != redirectedClusterLookupServicePtrs_.end()) { + return it->second; + } + auto lookup = createLookup(redirectedClusterURI); + redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup); + return lookup; } void ClientImpl::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, @@ -166,7 +186,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Producer()); @@ -180,7 +200,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon if (autoDownloadSchema) { auto self = shared_from_this(); - lookupServicePtr_->getSchema(topicName).addListener( + getSchema(topicName).addListener( [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { if (res != ResultOk) { callback(res, Producer()); @@ -188,12 +208,12 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } ProducerConfiguration conf; conf.setSchema(topicSchema); - self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + self->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); }); } else { - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); } @@ -253,7 +273,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st const ReaderConfiguration& conf, const ReaderCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Reader()); @@ -266,7 +286,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st } MessageId msgId(startMessageId); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, msgId, conf, callback)); } @@ -275,7 +295,7 @@ void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewC const TableViewCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, TableView()); @@ -341,7 +361,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const const SubscribeCallback& callback) { TopicNamePtr topicNamePtr = TopicName::get(regexPattern); - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -379,7 +399,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const return; } - lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) + getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, regexPattern, mode, subscriptionName, conf, callback)); @@ -401,9 +421,8 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace auto interceptors = std::make_shared(conf.getInterceptors()); - consumer = std::make_shared(shared_from_this(), regexPattern, mode, - *matchTopics, subscriptionName, conf, - lookupServicePtr_, interceptors); + consumer = std::make_shared( + shared_from_this(), regexPattern, mode, *matchTopics, subscriptionName, conf, interceptors); consumer->getConsumerCreatedFuture().addListener( std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -426,7 +445,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto it = std::unique(topics.begin(), topics.end()); auto newSize = std::distance(topics.begin(), it); topics.resize(newSize); - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -450,7 +469,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto interceptors = std::make_shared(conf.getInterceptors()); ConsumerImplBasePtr consumer = std::make_shared( - shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_, interceptors); + shared_from_this(), topics, subscriptionName, topicNamePtr, conf, interceptors); consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -462,7 +481,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub const ConsumerConfiguration& conf, const SubscribeCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -480,7 +499,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, subscriptionName, conf, callback)); } @@ -503,9 +522,9 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti callback(ResultInvalidConfiguration, Consumer()); return; } - consumer = std::make_shared( - shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf, - lookupServicePtr_, interceptors); + consumer = std::make_shared(shared_from_this(), topicName, + partitionMetadata->getPartitions(), + subscriptionName, conf, interceptors); } else { auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), subscriptionName, conf, @@ -647,7 +666,7 @@ void ClientImpl::handleGetPartitions(Result result, const LookupDataResultPtr& p void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, StringList()); @@ -658,13 +677,15 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP return; } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, callback)); + getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions, + shared_from_this(), std::placeholders::_1, + std::placeholders::_2, topicName, callback)); } void ClientImpl::closeAsync(const CloseCallback& callback) { + std::unique_lock lock(mutex_); if (state_ != Open) { + lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -678,6 +699,8 @@ void ClientImpl::closeAsync(const CloseCallback& callback) { for (const auto& it : redirectedClusterLookupServicePtrs_) { it.second->close(); } + redirectedClusterLookupServicePtrs_.clear(); + lock.unlock(); auto producers = producers_.move(); auto consumers = consumers_.move(); @@ -726,7 +749,7 @@ void ClientImpl::handleClose(Result result, const SharedInt& numberOfOpenHandler --(*numberOfOpenHandlers); } if (*numberOfOpenHandlers == 0) { - Lock lock(mutex_); + std::unique_lock lock(mutex_); if (state_ == Closed) { LOG_DEBUG("Client is already shutting down, possible race condition in handleClose"); return; @@ -776,7 +799,9 @@ void ClientImpl::shutdown() { << " consumers have been shutdown."); } + std::shared_lock lock(mutex_); lookupServicePtr_->close(); + lock.unlock(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; @@ -805,15 +830,9 @@ void ClientImpl::shutdown() { lookupCount_ = 0; } -uint64_t ClientImpl::newProducerId() { - Lock lock(mutex_); - return producerIdGenerator_++; -} +uint64_t ClientImpl::newProducerId() { return producerIdGenerator_++; } -uint64_t ClientImpl::newConsumerId() { - Lock lock(mutex_); - return consumerIdGenerator_++; -} +uint64_t ClientImpl::newConsumerId() { return consumerIdGenerator_++; } uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; } @@ -854,4 +873,36 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati return clientConfiguration.impl_->operationTimeout; } +void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) { + std::unique_lock lock{mutex_}; + if (state_ != Open) { + LOG_ERROR("Client is not open, cannot update service info"); + return; + } + + serviceInfo_ = {serviceInfo.serviceUrl, toOptionalAuthentication(clientConfiguration_.getAuthPtr()), + clientConfiguration_.getTlsTrustCertsFilePath().empty() + ? std::nullopt + : std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())}; + clientConfiguration_.impl_->updateServiceInfo(serviceInfo_); + + pool_.resetForClusterSwitching(clientConfiguration_.getAuthPtr(), clientConfiguration_); + + lookupServicePtr_->close(); + for (auto&& it : redirectedClusterLookupServicePtrs_) { + it.second->close(); + } + redirectedClusterLookupServicePtrs_.clear(); + lookupServicePtr_ = createLookup(serviceInfo.serviceUrl); + + // TODO: changes on the following two fields are not tested + useProxy_ = false; + lookupCount_ = 0; +} + +ServiceInfo ClientImpl::getServiceInfo() const { + std::shared_lock lock(mutex_); + return serviceInfo_; +} + } /* namespace pulsar */ diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 0b4d5969..2a6a8298 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -24,10 +24,12 @@ #include #include #include +#include #include "ConnectionPool.h" #include "Future.h" #include "LookupDataResult.h" +#include "LookupService.h" #include "MemoryLimitController.h" #include "ProtoApiEnums.h" #include "SynchronizedHashMap.h" @@ -52,8 +54,6 @@ typedef std::weak_ptr ConsumerImplBaseWeakPtr; class ClientConnection; using ClientConnectionPtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; using LookupServiceFactory = std::function; @@ -128,7 +128,6 @@ class ClientImpl : public std::enable_shared_from_this { ExecutorServiceProviderPtr getIOExecutorProvider(); ExecutorServiceProviderPtr getListenerExecutorProvider(); ExecutorServiceProviderPtr getPartitionListenerExecutorProvider(); - LookupServicePtr getLookup(const std::string& redirectedClusterURI = ""); void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); } @@ -139,6 +138,26 @@ class ClientImpl : public std::enable_shared_from_this { ConnectionPool& getConnectionPool() noexcept { return pool_; } uint64_t getLookupCount() { return lookupCount_; } + void updateServiceInfo(ServiceInfo&& serviceInfo); + ServiceInfo getServiceInfo() const; + + // Since the underlying `lookupServicePtr_` can be modified by `updateServiceInfo`, we should not expose + // it to other classes, otherwise the update might not be visible. + auto getPartitionMetadataAsync(const TopicNamePtr& topicName) { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getPartitionMetadataAsync(topicName); + } + + auto getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode); + } + + auto getSchema(const TopicNamePtr& topicName, const std::string& version = "") { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getSchema(topicName, version); + } + static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); friend class PulsarFriend; @@ -178,6 +197,7 @@ class ClientImpl : public std::enable_shared_from_this { const std::string& logicalAddress); LookupServicePtr createLookup(const std::string& serviceUrl); + LookupServicePtr getLookup(const std::string& redirectedClusterURI); static std::string getClientVersion(const ClientConfiguration& clientConfiguration); @@ -188,10 +208,11 @@ class ClientImpl : public std::enable_shared_from_this { Closed }; - std::mutex mutex_; + mutable std::shared_mutex mutex_; State state_; ClientConfiguration clientConfiguration_; + ServiceInfo serviceInfo_; MemoryLimitController memoryLimitController_; ExecutorServiceProviderPtr ioExecutorProvider_; @@ -202,8 +223,8 @@ class ClientImpl : public std::enable_shared_from_this { std::unordered_map redirectedClusterLookupServicePtrs_; ConnectionPool pool_; - uint64_t producerIdGenerator_; - uint64_t consumerIdGenerator_; + std::atomic_uint64_t producerIdGenerator_; + std::atomic_uint64_t consumerIdGenerator_; std::shared_ptr> requestIdGenerator_{std::make_shared>(0)}; SynchronizedHashMap producers_; diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index df050b0e..eaa14dc7 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -67,6 +67,21 @@ bool ConnectionPool::close() { return true; } +void ConnectionPool::resetForClusterSwitching(const AuthenticationPtr& authentication, + const ClientConfiguration& conf) { + std::unique_lock lock(mutex_); + authentication_ = authentication; + clientConfiguration_ = conf; + + for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { + auto& cnx = cnxIt->second; + if (cnx) { + cnx->close(ResultDisconnected, false, true); + } + } + pool_.clear(); +} + static const std::string getKey(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix) { std::stringstream ss; diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h index 0e3a6d0a..541cf8ed 100644 --- a/lib/ConnectionPool.h +++ b/lib/ConnectionPool.h @@ -51,6 +51,12 @@ class PULSAR_PUBLIC ConnectionPool { */ bool close(); + /** + * Close all existing connections and update the authentication and configuration. + * Unlike close(), the pool remains open for new connections. + */ + void resetForClusterSwitching(const AuthenticationPtr& authentication, const ClientConfiguration& conf); + void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix, ClientConnection* value); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 757b6e84..71cf1d01 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -125,7 +125,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic negativeAcksTracker_(std::make_shared(client, *this, conf)), ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)), readCompacted_(conf.isReadCompacted()), - startMessageId_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageIdFromConfig_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageId_(startMessageIdFromConfig_), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), @@ -1134,6 +1135,19 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { } } +void ConsumerImpl::onClusterSwitching() { + { + LockGuard lock{mutex_}; + incomingMessages_.clear(); + startMessageId_ = startMessageIdFromConfig_; + lastDequedMessageId_ = MessageId::earliest(); + lastMessageIdInBroker_ = MessageId::earliest(); + seekStatus_ = SeekStatus::NOT_STARTED; + lastSeekArg_.reset(); + } + ackGroupingTrackerPtr_->flushAndClean(); +} + /** * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that * was diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 0da82a2d..6f287aa2 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -162,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase { void doImmediateAck(const MessageId& msgId, const ResultCallback& callback, CommandAck_AckType ackType); void doImmediateAck(const std::set& msgIds, const ResultCallback& callback); + void onClusterSwitching(); + protected: // overrided methods from HandlerBase Future connectionOpened(const ClientConnectionPtr& cnx) override; @@ -266,6 +268,11 @@ class ConsumerImpl : public ConsumerImplBase { MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; + + // When the consumer switches to a new cluster, we should reset `startMessageId_` to the original value, + // otherwise, the message id of the old cluster might be passed in the Subscribe request on the new + // cluster. + const optional startMessageIdFromConfig_; optional startMessageId_; SeekStatus seekStatus_{SeekStatus::NOT_STARTED}; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 0799eb63..a5699781 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -44,20 +44,19 @@ using std::chrono::seconds; MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode, const optional& startMessageId) : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf, - lookupServicePtr, interceptors, subscriptionMode, startMessageId) { + interceptors, subscriptionMode, startMessageId) { topicsPartitions_[topicName->toString()] = numPartitions; } MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, - Commands::SubscriptionMode subscriptionMode, const optional& startMessageId) + const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode, + const optional& startMessageId) : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics", Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf, client->getListenerExecutorProvider()->get()), @@ -66,7 +65,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( conf_(conf), incomingMessages_(conf.getReceiverQueueSize()), messageListener_(conf.getMessageListener()), - lookupServicePtr_(lookupServicePtr), numberTopicPartitions_(std::make_shared>(0)), topics_(topics), subscriptionMode_(subscriptionMode), @@ -93,7 +91,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( if (partitionsUpdateInterval > 0) { partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); partitionsUpdateInterval_ = seconds(partitionsUpdateInterval); - lookupServicePtr_ = client->getLookup(); } state_ = Pending; @@ -185,7 +182,12 @@ Future MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s auto entry = topicsPartitions_.find(topic); if (entry == topicsPartitions_.end()) { lock.unlock(); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + auto client = client_.lock(); + if (!client) { + topicPromise->setFailed(ResultAlreadyClosed); + return topicPromise->getFuture(); + } + client->getPartitionMetadataAsync(topicName).addListener( [this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) { if (result != ResultOk) { LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- " @@ -1003,7 +1005,11 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() { auto topicName = TopicName::get(item.first); auto currentNumPartitions = item.second; auto weakSelf = weak_from_this(); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + auto client = client_.lock(); + if (!client) { + return; + } + client->getPartitionMetadataAsync(topicName).addListener( [this, weakSelf, topicName, currentNumPartitions](Result result, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index dc628652..38a44cdf 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -46,23 +46,19 @@ class MultiTopicsBrokerConsumerStatsImpl; using MultiTopicsBrokerConsumerStatsPtr = std::shared_ptr; class UnAckedMessageTrackerInterface; using UnAckedMessageTrackerPtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; class MultiTopicsConsumerImpl; class MultiTopicsConsumerImpl : public ConsumerImplBase { public: MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, const optional& startMessageId = optional{}); MultiTopicsConsumerImpl(const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, - const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_, - const ConsumerInterceptorsPtr& interceptors, + const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, const optional& startMessageId = optional{}); @@ -119,7 +115,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { MessageListener messageListener_; DeadlineTimerPtr partitionsUpdateTimer_; TimeDuration partitionsUpdateInterval_; - LookupServicePtr lookupServicePtr_; std::shared_ptr> numberTopicPartitions_; std::atomic failedResult{ResultOk}; Promise multiTopicsConsumerCreatedPromise_; diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 4a923666..1aa5c87b 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -23,7 +23,6 @@ #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" -#include "LookupService.h" #include "ProducerImpl.h" #include "RoundRobinMessageRouter.h" #include "SinglePartitionMessageRouter.h" @@ -59,7 +58,6 @@ PartitionedProducerImpl::PartitionedProducerImpl(const ClientImplPtr& client, co listenerExecutor_ = client->getListenerExecutorProvider()->get(); partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); partitionsUpdateInterval_ = std::chrono::seconds(partitionsUpdateInterval); - lookupServicePtr_ = client->getLookup(); } } @@ -433,7 +431,11 @@ void PartitionedProducerImpl::runPartitionUpdateTask() { void PartitionedProducerImpl::getPartitionMetadata() { using namespace std::placeholders; auto weakSelf = weak_from_this(); - lookupServicePtr_->getPartitionMetadataAsync(topicName_) + auto client = client_.lock(); + if (!client) { + return; + } + client->getPartitionMetadataAsync(topicName_) .addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); if (self) { diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 40f2d34d..94ba7179 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -38,8 +38,6 @@ using ClientImplPtr = std::shared_ptr; using ClientImplWeakPtr = std::weak_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; class ProducerImpl; using ProducerImplPtr = std::shared_ptr; class TopicName; @@ -133,7 +131,6 @@ class PartitionedProducerImpl : public ProducerImplBase, ExecutorServicePtr listenerExecutor_; DeadlineTimerPtr partitionsUpdateTimer_; TimeDuration partitionsUpdateInterval_; - LookupServicePtr lookupServicePtr_; ProducerInterceptorsPtr interceptors_; diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index fd48feed..4b5aab73 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -21,7 +21,6 @@ #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" -#include "LookupService.h" DECLARE_LOG_OBJECT() @@ -32,10 +31,8 @@ using std::chrono::seconds; PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl( const ClientImplPtr& client, const std::string& pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, - const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_, - const ConsumerInterceptorsPtr& interceptors) - : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, - lookupServicePtr_, interceptors), + const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors) + : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, interceptors), patternString_(pattern), pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))), getTopicsMode_(getTopicsMode), @@ -84,7 +81,11 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& er // already get namespace from pattern. assert(namespaceName_); - lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_) + auto client = client_.lock(); + if (!client) { + return; + } + client->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_) .addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this, std::placeholders::_1, std::placeholders::_2)); } diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h index 63527965..796abcc2 100644 --- a/lib/PatternMultiTopicsConsumerImpl.h +++ b/lib/PatternMultiTopicsConsumerImpl.h @@ -52,7 +52,6 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr_, const ConsumerInterceptorsPtr& interceptors); ~PatternMultiTopicsConsumerImpl() override; diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index 7fa7e8b9..754137c5 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -90,7 +90,6 @@ void ReaderImpl::start(const MessageId& startMessageId, if (partitions_ > 0) { auto consumerImpl = std::make_shared( client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf, - client_.lock()->getLookup(), std::make_shared(std::vector()), Commands::SubscriptionModeNonDurable, startMessageId); consumer_ = consumerImpl; diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc index 84e8572d..4bfd8085 100644 --- a/tests/AuthTokenTest.cc +++ b/tests/AuthTokenTest.cc @@ -42,7 +42,7 @@ static const std::string serviceUrlHttp = "http://localhost:8080"; static const std::string tokenPath = TOKEN_PATH; -static std::string getToken() { +std::string getToken() { std::ifstream file(tokenPath); std::string str((std::istreambuf_iterator(file)), std::istreambuf_iterator()); return str; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index dd892686..f6a30154 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -17,13 +17,16 @@ * under the License. */ #include +#include #include +#include #include #include #include #include -#include +#include +#include #include "MockClientImpl.h" #include "PulsarAdminHelper.h" @@ -32,7 +35,6 @@ #include "lib/ClientConnection.h" #include "lib/LogUtils.h" #include "lib/checksum/ChecksumProvider.h" -#include "lib/stats/ProducerStatsImpl.h" DECLARE_LOG_OBJECT() @@ -506,3 +508,86 @@ TEST(ClientTest, testNoRetry) { ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms"; } } + +TEST(ClientTest, testUpdateServiceInfo) { + extern std::string getToken(); // from tests/AuthToken.cc + + // Access "private/auth" namespace in cluster 1 + ServiceInfo info1{"pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt}; + // Access "private/auth" namespace in cluster 2 + ServiceInfo info2{"pulsar+ssl://localhost:6653", + AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), + TEST_CONF_DIR "/hn-verification/cacert.pem"}; + // Access "public/default" namespace in cluster 1, which doesn't require authentication + ServiceInfo info3{"pulsar://localhost:6650", std::nullopt, std::nullopt}; + + Client client{info1.serviceUrl, ClientConfiguration().setAuth(*info1.authentication)}; + + const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer)); + + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topicRequiredAuth, MessageId::earliest(), {}, reader)); + + auto sendAndReceive = [&](const std::string &value) { + MessageId msgId; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(value).build(), msgId)); + LOG_INFO("Sent " << value << " to " << msgId); + + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); + LOG_INFO("Read " << msg.getDataAsString() << " from " << msgId); + ASSERT_EQ(value, msg.getDataAsString()); + }; + + sendAndReceive("msg-0"); + + // Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh) + ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); + client.updateServiceInfo(info2); + ASSERT_TRUE(PulsarFriend::getConnections(client).empty()); + ASSERT_EQ(info2.serviceUrl, client.getServiceInfo().serviceUrl); + ASSERT_EQ(info2.authentication, client.getServiceInfo().authentication); + ASSERT_EQ(info2.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); + + // Now the same will access the same topic in cluster 2 + sendAndReceive("msg-1"); + + // Switch back to cluster 1 without any authentication, the previous authentication info configured for + // cluster 2 will be cleared. + client.updateServiceInfo(info3); + ASSERT_EQ(info3.serviceUrl, client.getServiceInfo().serviceUrl); + ASSERT_EQ(info3.authentication, client.getServiceInfo().authentication); + ASSERT_EQ(info3.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath); + + const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + producer.close(); + ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build())); + + client.close(); + + // Verify messages sent to cluster 1 and cluster 2 can be consumed successfully with correct + // authentication info. + auto verify = [](Client &client, const std::string &topic, const std::string &value) { + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader)); + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); + ASSERT_EQ(value, msg.getDataAsString()); + }; + Client client1{info1.serviceUrl, ClientConfiguration().setAuth(*info1.authentication)}; + verify(client1, topicRequiredAuth, "msg-0"); + client1.close(); + + Client client2{info2.serviceUrl, ClientConfiguration() + .setAuth(*info2.authentication) + .setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath)}; + verify(client2, topicRequiredAuth, "msg-1"); + client2.close(); + + Client client3{info3.serviceUrl}; + verify(client3, topicNoAuth, "msg-2"); + client3.close(); +} diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 92aa8204..dc9cfc4e 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -259,7 +259,7 @@ TEST_P(LookupServiceTest, basicGetNamespaceTopics) { ASSERT_EQ(ResultOk, result); // 2. verify getTopicsOfNamespace by regex mode. - auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_)->getLookup(); + auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_); auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode, const std::set& expectedTopics) { Future getTopicsFuture = @@ -292,11 +292,8 @@ TEST_P(LookupServiceTest, testGetSchema) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); ASSERT_EQ(jsonSchema, schemaInfo.getSchema()); ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType()); @@ -310,11 +307,8 @@ TEST_P(LookupServiceTest, testGetSchemaNotFound) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo)); } @@ -335,11 +329,8 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema()); ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType());