Skip to content
Draft
105 changes: 105 additions & 0 deletions include/pulsar/AutoClusterFailover.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_
#define PULSAR_AUTO_CLUSTER_FAILOVER_H_

#include <pulsar/ServiceInfo.h>

#include <chrono>

namespace pulsar {

class Client;
class AutoClusterFailoverImpl;

class PULSAR_PUBLIC AutoClusterFailover final {
public:
/**
* Leverage `Client`'s internals for service URL probe and cluster switching.
*/
void initialize(Client& client);

struct Config {
ServiceInfo primary;
std::vector<ServiceInfo> secondary;
std::chrono::milliseconds checkInterval{30000}; // 30 seconds
std::chrono::milliseconds failoverDelay{30000}; // 30 seconds
std::chrono::milliseconds switchBackDelay{60000}; // 60 seconds
};

/**
* Builder helps create an AutoClusterFailover configuration.
*
* Example:
* ServiceInfo primary{...};
* std::vector<ServiceInfo> secondaries{...};
* AutoClusterFailover provider = AutoClusterFailover::Builder(primary, secondaries)
* .withCheckInterval(std::chrono::seconds(30))
* .withFailoverDelay(std::chrono::seconds(30))
* .withSwitchBackDelay(std::chrono::seconds(60))
* .build();
*
* Notes:
* - primary: the preferred cluster to use when available.
* - secondary: ordered list of fallback clusters.
* - checkInterval: frequency of health probes.
* - failoverDelay: how long the current cluster must be unreachable before switching.
* - switchBackDelay: how long the primary must remain healthy before switching back.
*/
class Builder {
public:
Builder(ServiceInfo primary, std::vector<ServiceInfo> secondary) {
config_.primary = std::move(primary);
config_.secondary = std::move(secondary);
}

// Set how frequently probes run against the active cluster(s).
Builder& withCheckInterval(std::chrono::milliseconds interval) {
config_.checkInterval = interval;
return *this;
}

// Set how long the current cluster must be unreachable before attempting failover.
Builder& withFailoverDelay(std::chrono::milliseconds delay) {
config_.failoverDelay = delay;
return *this;
}

// Set how long the primary must remain healthy before switching back from a secondary.
Builder& withSwitchBackDelay(std::chrono::milliseconds delay) {
config_.switchBackDelay = delay;
return *this;
}

AutoClusterFailover build() { return AutoClusterFailover(std::move(config_)); }

private:
Config config_;
};

explicit AutoClusterFailover(Config&& config);
~AutoClusterFailover();

private:
std::shared_ptr<AutoClusterFailoverImpl> impl_;
};

} // namespace pulsar

#endif
20 changes: 20 additions & 0 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
#include <pulsar/Reader.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
#include <pulsar/ServiceInfo.h>
#include <pulsar/TableView.h>
#include <pulsar/defines.h>

#include <optional>
#include <string>

namespace pulsar {
Expand Down Expand Up @@ -414,11 +416,29 @@ class PULSAR_PUBLIC Client {
void getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)> callback);

/**
* Update the service information of the client.
*
* This method is used to switch the connection to a different Pulsar cluster. All connections will be
* closed and the internal service info will be updated.
*
* @param serviceInfo the service URL, authentication and TLS trust certs file path to use
*/
void updateServiceInfo(ServiceInfo serviceInfo);

/**
* Get the current service information of the client.
*
* @return the current service information
*/
ServiceInfo getServiceInfo() const;

private:
Client(const std::shared_ptr<ClientImpl>&);

friend class PulsarFriend;
friend class PulsarWrapper;
friend class AutoClusterFailover;
std::shared_ptr<ClientImpl> impl_;
};
} // namespace pulsar
Expand Down
42 changes: 42 additions & 0 deletions include/pulsar/ServiceInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_SERVICE_INFO_H_
#define PULSAR_SERVICE_INFO_H_

#include <pulsar/Authentication.h>

#include <optional>
#include <string>

namespace pulsar {

struct PULSAR_PUBLIC ServiceInfo {
std::string serviceUrl;
std::optional<AuthenticationPtr> authentication;
std::optional<std::string> tlsTrustCertsFilePath;

bool operator==(const ServiceInfo& other) const {
return serviceUrl == other.serviceUrl && authentication == other.authentication &&
tlsTrustCertsFilePath == other.tlsTrustCertsFilePath;
}
};

} // namespace pulsar

#endif
Loading
Loading