Skip to content

Commit

Permalink
[fix](meta-mgr) Real-time parsing meta service endpoint to avoid rpc …
Browse files Browse the repository at this point in the history
…failed after config muted (#45877)


Original config::meta_service_endpoint is immutable. But it is changed
to mutable now, so we should parse it in realtime to avoid rpc failure.
  • Loading branch information
TangSiyang2001 authored and Your Name committed Dec 25, 2024
1 parent e3cae82 commit 3d42f9b
Showing 1 changed file with 7 additions and 10 deletions.
17 changes: 7 additions & 10 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class MetaServiceProxy {
}

private:
static bool is_meta_service_endpoint_list() {
return config::meta_service_endpoint.find(',') != std::string::npos;
}

static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
static std::once_flag proxies_flag;
static size_t num_proxies = 1;
Expand All @@ -154,9 +158,6 @@ class MetaServiceProxy {
if (config::meta_service_connection_pooled) {
num_proxies = config::meta_service_connection_pool_size;
}
if (config::meta_service_endpoint.find(',') != std::string::npos) {
is_meta_service_endpoint_list = true;
}
proxies = std::make_unique<MetaServiceProxy[]>(num_proxies);
});

Expand All @@ -175,7 +176,7 @@ class MetaServiceProxy {

const char* load_balancer_name = nullptr;
std::string endpoint;
if (is_meta_service_endpoint_list) {
if (is_meta_service_endpoint_list()) {
endpoint = fmt::format("list://{}", config::meta_service_endpoint);
load_balancer_name = "random";
} else {
Expand Down Expand Up @@ -215,7 +216,7 @@ class MetaServiceProxy {
bool is_idle_timeout(long now) {
auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms;
// idle timeout only works without list endpoint.
return !is_meta_service_endpoint_list && idle_timeout_ms > 0 &&
return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 &&
_last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
}

Expand Down Expand Up @@ -243,7 +244,7 @@ class MetaServiceProxy {

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list &&
if (!is_meta_service_endpoint_list() &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
Expand All @@ -262,16 +263,12 @@ class MetaServiceProxy {
return Status::OK();
}

static std::atomic_bool is_meta_service_endpoint_list;

std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;
};

std::atomic_bool MetaServiceProxy::is_meta_service_endpoint_list = false;

template <typename T, typename... Ts>
struct is_any : std::disjunction<std::is_same<T, Ts>...> {};

Expand Down

0 comments on commit 3d42f9b

Please sign in to comment.