Skip to content

Commit

Permalink
[Refactor]Refactor workload group metric (#46640)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo authored Jan 13, 2025
1 parent d884c14 commit 7a0dd23
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 46 deletions.
64 changes: 30 additions & 34 deletions be/src/runtime/workload_group/workload_group_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,41 @@

namespace doris {

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(workload_group_cpu_time_sec, doris::MetricUnit::SECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(workload_group_mem_used_bytes, doris::MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(workload_group_remote_scan_bytes, doris::MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(workload_group_total_local_scan_bytes,
doris::MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(workload_group_local_scan_bytes, doris::MetricUnit::BYTES);

#include "common/compile_check_begin.h"

WorkloadGroupMetrics::~WorkloadGroupMetrics() {
DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
}

WorkloadGroupMetrics::WorkloadGroupMetrics(WorkloadGroup* wg) {
std::string wg_id_prefix = "workload_group_" + std::to_string(wg->id());
_entity = DorisMetrics::instance()->metric_registry()->register_entity(
"workload_group." + std::to_string(wg->id()), {{"workload_group", wg->name()}});

_cpu_time_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::SECONDS, "workload_group_cpu_time_sec");
_cpu_time_counter = (IntCounter*)(_entity->register_metric<IntCounter>(_cpu_time_metric.get()));

_mem_used_bytes_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "workload_group_mem_used_bytes");
_mem_used_bytes_counter =
(IntCounter*)(_entity->register_metric<IntCounter>(_mem_used_bytes_metric.get()));

_local_scan_bytes_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES,
"workload_group_local_scan_bytes");
_local_scan_bytes_counter =
(IntCounter*)(_entity->register_metric<IntCounter>(_local_scan_bytes_metric.get()));
wg_id_prefix, {{"workload_group", wg->name()}, {"id", std::to_string(wg->id())}});

_remote_scan_bytes_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES,
"workload_group_remote_scan_bytes");
_remote_scan_bytes_counter =
(IntCounter*)(_entity->register_metric<IntCounter>(_remote_scan_bytes_metric.get()));
INT_COUNTER_METRIC_REGISTER(_entity, workload_group_cpu_time_sec);
INT_GAUGE_METRIC_REGISTER(_entity, workload_group_mem_used_bytes);
INT_COUNTER_METRIC_REGISTER(_entity, workload_group_remote_scan_bytes);
INT_COUNTER_METRIC_REGISTER(_entity, workload_group_total_local_scan_bytes);

std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
for (const auto& data_dir : data_dir_list) {
std::unique_ptr<doris::MetricPrototype> metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES,
"workload_group_local_scan_bytes_" + data_dir.metric_name);
_local_scan_bytes_counter_map.insert(
{data_dir.path, (IntCounter*)(_entity->register_metric<IntCounter>(metric.get()))});
_local_scan_bytes_metric_map.insert({data_dir.path, std::move(metric)});
std::string data_dir_metric_name = wg_id_prefix + "_io_" + data_dir.metric_name;
std::shared_ptr<MetricEntity> io_entity =
DorisMetrics::instance()->metric_registry()->register_entity(
data_dir_metric_name, {{"workload_group", wg->name()},
{"path", data_dir.metric_name},
{"id", std::to_string(wg->id())}});
IntCounter* workload_group_local_scan_bytes = nullptr;
INT_COUNTER_METRIC_REGISTER(io_entity, workload_group_local_scan_bytes);
_local_scan_bytes_counter_map.insert({data_dir.path, workload_group_local_scan_bytes});
_io_entity_list.push_back(io_entity);
}
}

Expand All @@ -77,15 +73,15 @@ void WorkloadGroupMetrics::update_memory_used_bytes(int64_t memory_used) {
}

void WorkloadGroupMetrics::update_local_scan_io_bytes(std::string path, uint64_t delta_io_bytes) {
_local_scan_bytes_counter->increment(delta_io_bytes);
workload_group_total_local_scan_bytes->increment(delta_io_bytes);
auto range = _local_scan_bytes_counter_map.equal_range(path);
for (auto it = range.first; it != range.second; ++it) {
it->second->increment((int64_t)delta_io_bytes);
}
}

void WorkloadGroupMetrics::update_remote_scan_io_bytes(uint64_t delta_io_bytes) {
_remote_scan_bytes_counter->increment(delta_io_bytes);
workload_group_remote_scan_bytes->increment(delta_io_bytes);
}

void WorkloadGroupMetrics::refresh_metrics() {
Expand All @@ -94,21 +90,21 @@ void WorkloadGroupMetrics::refresh_metrics() {
// cpu
uint64_t _current_cpu_time_nanos = _cpu_time_nanos.load();
uint64_t _cpu_time_sec = _current_cpu_time_nanos / (1000L * 1000L * 1000L);
_cpu_time_counter->set_value(_cpu_time_sec);
workload_group_cpu_time_sec->set_value(_cpu_time_sec);
_per_sec_cpu_time_nanos = (_current_cpu_time_nanos - _last_cpu_time_nanos) / interval_second;
_last_cpu_time_nanos = _current_cpu_time_nanos;

// memory
_mem_used_bytes_counter->set_value(_memory_used);
workload_group_mem_used_bytes->set_value(_memory_used);

// local scan
int64_t current_local_scan_bytes = _local_scan_bytes_counter->value();
int64_t current_local_scan_bytes = workload_group_total_local_scan_bytes->value();
_per_sec_local_scan_bytes =
(current_local_scan_bytes - _last_local_scan_bytes) / interval_second;
_last_local_scan_bytes = current_local_scan_bytes;

// remote scan
int64_t current_remote_scan_bytes = _remote_scan_bytes_counter->value();
int64_t current_remote_scan_bytes = workload_group_remote_scan_bytes->value();
_per_sec_remote_scan_bytes =
(current_remote_scan_bytes - _last_remote_scan_bytes) / interval_second;
_last_remote_scan_bytes = current_remote_scan_bytes;
Expand All @@ -127,7 +123,7 @@ int64_t WorkloadGroupMetrics::get_remote_scan_bytes_per_second() {
}

int64_t WorkloadGroupMetrics::get_memory_used() {
return _mem_used_bytes_counter->value();
return workload_group_mem_used_bytes->value();
}

} // namespace doris
23 changes: 11 additions & 12 deletions be/src/runtime/workload_group/workload_group_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

namespace doris {

Expand All @@ -30,6 +31,11 @@ class WorkloadGroup;
template <typename T>
class AtomicCounter;
using IntCounter = AtomicCounter<int64_t>;

template <typename T>
class AtomicGauge;
using IntGuage = AtomicGauge<int64_t>;

class MetricEntity;
struct MetricPrototype;

Expand Down Expand Up @@ -58,18 +64,10 @@ class WorkloadGroupMetrics {
int64_t get_memory_used();

private:
std::unique_ptr<doris::MetricPrototype> _cpu_time_metric {nullptr};
std::unique_ptr<doris::MetricPrototype> _mem_used_bytes_metric {nullptr};
std::unique_ptr<doris::MetricPrototype> _local_scan_bytes_metric {nullptr};
std::unique_ptr<doris::MetricPrototype> _remote_scan_bytes_metric {nullptr};
// NOTE: _local_scan_bytes_metric is sum of all disk's IO
std::unordered_multimap<std::string, std::unique_ptr<doris::MetricPrototype>>
_local_scan_bytes_metric_map;

IntCounter* _cpu_time_counter {nullptr}; // used for metric
IntCounter* _mem_used_bytes_counter {nullptr}; // used for metric
IntCounter* _local_scan_bytes_counter {nullptr}; // used for metric
IntCounter* _remote_scan_bytes_counter {nullptr}; // used for metric
IntCounter* workload_group_cpu_time_sec {nullptr}; // used for metric
IntGuage* workload_group_mem_used_bytes {nullptr}; // used for metric
IntCounter* workload_group_remote_scan_bytes {nullptr}; // used for metric
IntCounter* workload_group_total_local_scan_bytes {nullptr}; // used for metric
std::unordered_multimap<std::string, IntCounter*>
_local_scan_bytes_counter_map; // used for metric

Expand All @@ -86,6 +84,7 @@ class WorkloadGroupMetrics {
std::atomic<uint64_t> _memory_used {0};

std::shared_ptr<MetricEntity> _entity {nullptr};
std::vector<std::shared_ptr<MetricEntity>> _io_entity_list;
};

} // namespace doris

0 comments on commit 7a0dd23

Please sign in to comment.