Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <pulsar/TableView.h>
#include <pulsar/defines.h>

#include <optional>
#include <string>

namespace pulsar {
Expand All @@ -42,6 +43,12 @@ typedef std::function<void(Result, TableView)> TableViewCallback;
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
typedef std::function<void(Result)> CloseCallback;

struct PULSAR_PUBLIC ServiceInfo {
std::string serviceUrl;
std::optional<AuthenticationPtr> authentication;
std::optional<std::string> tlsTrustCertsFilePath;
};

class ClientImpl;
class PulsarFriend;
class PulsarWrapper;
Expand Down Expand Up @@ -414,6 +421,23 @@ class PULSAR_PUBLIC Client {
void getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)> callback);

/**
* Update the service information of the client.
*
* This method is used to switch the connection to a different Pulsar cluster. All connections will be
* closed and the internal service info will be updated.
*
* @param serviceInfo the service URL, authentication and TLS trust certs file path to use
*/
void updateServiceInfo(ServiceInfo serviceInfo);

/**
* Get the current service information of the client.
*
* @return the current service information
*/
ServiceInfo getServiceInfo() const;

private:
Client(const std::shared_ptr<ClientImpl>&);

Expand Down
5 changes: 5 additions & 0 deletions lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,9 @@ void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
.addListener(std::move(callback));
}

void Client::updateServiceInfo(ServiceInfo serviceInfo) { impl_->updateServiceInfo(std::move(serviceInfo)); }

ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }

} // namespace pulsar
130 changes: 94 additions & 36 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include <algorithm>
#include <chrono>
#include <iterator>
#include <mutex>
#include <random>
#include <shared_mutex>
#include <sstream>

#include "BinaryProtoLookupService.h"
Expand Down Expand Up @@ -74,10 +76,17 @@ std::string generateRandomName() {
return randomName;
}

typedef std::unique_lock<std::mutex> Lock;

typedef std::vector<std::string> StringList;

namespace {
std::optional<AuthenticationPtr> toOptionalAuthentication(const AuthenticationPtr& authentication) {
if (!authentication || authentication->getAuthMethodName() == "none") {
return std::nullopt;
}
return authentication;
}
} // namespace

static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl,
const ClientConfiguration& clientConfiguration,
ConnectionPool& pool, const AuthenticationPtr& auth) {
Expand All @@ -101,6 +110,10 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
state_(Open),
clientConfiguration_(ClientConfiguration(clientConfiguration)
.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))),
serviceInfo_{serviceUrl, toOptionalAuthentication(clientConfiguration.getAuthPtr()),
clientConfiguration.getTlsTrustCertsFilePath().empty()
? std::nullopt
: std::make_optional(clientConfiguration.getTlsTrustCertsFilePath())},
memoryLimitController_(clientConfiguration.getMemoryLimit()),
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
listenerExecutorProvider_(
Expand Down Expand Up @@ -144,19 +157,26 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
}

LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) {
std::shared_lock readLock(mutex_);
if (redirectedClusterURI.empty()) {
return lookupServicePtr_;
}

Lock lock(mutex_);
auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
if (it == redirectedClusterLookupServicePtrs_.end()) {
auto lookup = createLookup(redirectedClusterURI);
redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup);
return lookup;
if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
it != redirectedClusterLookupServicePtrs_.end()) {
return it->second;
}
readLock.unlock();

return it->second;
std::unique_lock writeLock(mutex_);
// Double check in case another thread acquires the lock and inserts a pair first
if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
it != redirectedClusterLookupServicePtrs_.end()) {
return it->second;
}
auto lookup = createLookup(redirectedClusterURI);
redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup);
return lookup;
}

void ClientImpl::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
Expand All @@ -166,7 +186,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon
}
TopicNamePtr topicName;
{
Lock lock(mutex_);
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Producer());
Expand All @@ -180,20 +200,21 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon

if (autoDownloadSchema) {
auto self = shared_from_this();
lookupServicePtr_->getSchema(topicName).addListener(
auto lookup = getLookup();
lookup->getSchema(topicName).addListener(
[self, topicName, callback](Result res, const SchemaInfo& topicSchema) {
if (res != ResultOk) {
callback(res, Producer());
return;
}
ProducerConfiguration conf;
conf.setSchema(topicSchema);
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
self->getLookup()->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1,
std::placeholders::_2, topicName, conf, callback));
});
} else {
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
getLookup()->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, conf, callback));
}
Expand Down Expand Up @@ -253,7 +274,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st
const ReaderConfiguration& conf, const ReaderCallback& callback) {
TopicNamePtr topicName;
{
Lock lock(mutex_);
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Reader());
Expand All @@ -266,7 +287,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st
}

MessageId msgId(startMessageId);
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
getLookup()->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, msgId, conf, callback));
}
Expand All @@ -275,7 +296,7 @@ void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewC
const TableViewCallback& callback) {
TopicNamePtr topicName;
{
Lock lock(mutex_);
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, TableView());
Expand Down Expand Up @@ -341,7 +362,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const
const SubscribeCallback& callback) {
TopicNamePtr topicNamePtr = TopicName::get(regexPattern);

Lock lock(mutex_);
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
Expand Down Expand Up @@ -379,7 +400,8 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const
return;
}

lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
getLookup()
->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
.addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, regexPattern, mode,
subscriptionName, conf, callback));
Expand All @@ -403,7 +425,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace

consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), regexPattern, mode,
*matchTopics, subscriptionName, conf,
lookupServicePtr_, interceptors);
getLookup(), interceptors);

consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
Expand All @@ -426,7 +448,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& originalTopics,
auto it = std::unique(topics.begin(), topics.end());
auto newSize = std::distance(topics.begin(), it);
topics.resize(newSize);
Lock lock(mutex_);
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
Expand All @@ -450,7 +472,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& originalTopics,
auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors());

ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_, interceptors);
shared_from_this(), topics, subscriptionName, topicNamePtr, conf, getLookup(), interceptors);

consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
shared_from_this(), std::placeholders::_1,
Expand All @@ -462,7 +484,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
TopicNamePtr topicName;
{
Lock lock(mutex_);
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
Expand All @@ -480,7 +502,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub
}
}

lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
getLookup()->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, subscriptionName, conf, callback));
}
Expand All @@ -505,7 +527,7 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti
}
consumer = std::make_shared<MultiTopicsConsumerImpl>(
shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf,
lookupServicePtr_, interceptors);
getLookup(), interceptors);
} else {
auto consumerImpl = std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
subscriptionName, conf,
Expand Down Expand Up @@ -647,7 +669,7 @@ void ClientImpl::handleGetPartitions(Result result, const LookupDataResultPtr& p
void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback) {
TopicNamePtr topicName;
{
Lock lock(mutex_);
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, StringList());
Expand All @@ -658,13 +680,15 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP
return;
}
}
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
getLookup()->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, callback));
}

void ClientImpl::closeAsync(const CloseCallback& callback) {
std::unique_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
if (callback) {
callback(ResultAlreadyClosed);
}
Expand All @@ -678,6 +702,8 @@ void ClientImpl::closeAsync(const CloseCallback& callback) {
for (const auto& it : redirectedClusterLookupServicePtrs_) {
it.second->close();
}
redirectedClusterLookupServicePtrs_.clear();
lock.unlock();

auto producers = producers_.move();
auto consumers = consumers_.move();
Expand Down Expand Up @@ -726,7 +752,7 @@ void ClientImpl::handleClose(Result result, const SharedInt& numberOfOpenHandler
--(*numberOfOpenHandlers);
}
if (*numberOfOpenHandlers == 0) {
Lock lock(mutex_);
std::unique_lock lock(mutex_);
if (state_ == Closed) {
LOG_DEBUG("Client is already shutting down, possible race condition in handleClose");
return;
Expand Down Expand Up @@ -776,7 +802,7 @@ void ClientImpl::shutdown() {
<< " consumers have been shutdown.");
}

lookupServicePtr_->close();
getLookup()->close();
if (!pool_.close()) {
// pool_ has already been closed. It means shutdown() has been called before.
return;
Expand Down Expand Up @@ -805,15 +831,9 @@ void ClientImpl::shutdown() {
lookupCount_ = 0;
}

uint64_t ClientImpl::newProducerId() {
Lock lock(mutex_);
return producerIdGenerator_++;
}
uint64_t ClientImpl::newProducerId() { return producerIdGenerator_++; }

uint64_t ClientImpl::newConsumerId() {
Lock lock(mutex_);
return consumerIdGenerator_++;
}
uint64_t ClientImpl::newConsumerId() { return consumerIdGenerator_++; }

uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; }

Expand Down Expand Up @@ -854,4 +874,42 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati
return clientConfiguration.impl_->operationTimeout;
}

void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) {
std::unique_lock lock(mutex_);
if (state_ != Open) {
LOG_ERROR("Client is not open, cannot update connection info");
return;
}

if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) {
clientConfiguration_.setAuth(*serviceInfo.authentication);
} else {
clientConfiguration_.setAuth(AuthFactory::Disabled());
}
if (serviceInfo.tlsTrustCertsFilePath.has_value()) {
clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath);
} else {
clientConfiguration_.setTlsTrustCertsFilePath("");
}
clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl)));
serviceInfo_ = {serviceInfo.serviceUrl, toOptionalAuthentication(clientConfiguration_.getAuthPtr()),
clientConfiguration_.getTlsTrustCertsFilePath().empty()
? std::nullopt
: std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())};

pool_.resetConnections(clientConfiguration_.getAuthPtr(), clientConfiguration_);

lookupServicePtr_->close();
for (auto&& it : redirectedClusterLookupServicePtrs_) {
it.second->close();
}
redirectedClusterLookupServicePtrs_.clear();
lookupServicePtr_ = createLookup(serviceInfo.serviceUrl);
}

ServiceInfo ClientImpl::getServiceInfo() const {
std::shared_lock lock(mutex_);
return serviceInfo_;
}

} /* namespace pulsar */
Loading
Loading