Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] opt k8smetadatas #1956

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ DEFINE_FLAG_STRING(metrics_report_method,
"method to report metrics (default none, means logtail will not report metrics)",
"sls");

DEFINE_FLAG_STRING(loong_collector_singleton_service, "loong collector singleton service", "loongcollector-singleton");
DEFINE_FLAG_INT32(loong_collector_singleton_port, "loong collector singleton service port", 8899);
DEFINE_FLAG_STRING(loong_collector_operator_service, "loong collector operator service", "");
DEFINE_FLAG_INT32(loong_collector_operator_service_port, "loong collector operator service port", 8888);
DEFINE_FLAG_INT32(loong_collector_k8s_meta_service_port, "loong collector operator service port", 9000);
Expand Down
18 changes: 2 additions & 16 deletions core/common/LRUCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,9 @@ namespace logtail {
* directly anyway! :)
*/
explicit Cache(size_t maxSize = 64, size_t elasticity = 10)
: maxSize_(maxSize), elasticity_(elasticity), prune_thread(&Cache::pruneThreadFunc, this) {}
: maxSize_(maxSize), elasticity_(elasticity) {}

virtual ~Cache() {
stop_pruning = true;
if (prune_thread.joinable()) {
prune_thread.join();
}
}
virtual ~Cache() {}

size_t size() const {
Guard g(lock_);
Expand Down Expand Up @@ -250,13 +245,6 @@ namespace logtail {
return count;
}

void pruneThreadFunc() {
while (!stop_pruning) {
pruneExpired();
std::this_thread::sleep_for(std::chrono::seconds(60)); // 每60秒检查一次
}
}

private:
// Disallow copying.
Cache(const Cache&) = delete;
Expand All @@ -267,8 +255,6 @@ namespace logtail {
list_type keys_;
size_t maxSize_;
size_t elasticity_;
std::atomic<bool> stop_pruning{false};
std::thread prune_thread;

#ifdef APSARA_UNIT_TEST_MAIN
friend class LRUCacheUnittest;
Expand Down
22 changes: 20 additions & 2 deletions core/constants/TagConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ namespace logtail {
const std::string DEFAULT_METRIC_TAG_IMAGE_NAME = DEFAULT_TAG_IMAGE_NAME;

////////////////////////// TRACE ////////////////////////
const std::string DEFAULT_TRACE_TAG_K8S_NAMESPACE = "k8s.namespace.name";
const std::string DEFAULT_TRACE_TAG_K8S_PEER_NAMESPACE = "k8s.peer.namespace.name";
const std::string DEFAULT_TRACE_TAG_K8S_POD_NAME = "k8s.pod.name";
const std::string DEFAULT_TRACE_TAG_K8S_PEER_POD_NAME = "k8s.pod.name";
const std::string DEFAULT_TRACE_TAG_K8S_POD_UID = "k8s.pod.uid";
const std::string DEFAULT_TRACE_TAG_K8S_POD_IP = "k8s.pod.ip";
const std::string DEFAULT_TRACE_TAG_K8S_PEER_POD_IP = "k8s.peer.pod.ip";
const std::string DEFAULT_TRACE_TAG_K8S_WORKLOAD_KIND = "k8s.workload.kind";
const std::string DEFAULT_TRACE_TAG_K8S_PEER_WORKLOAD_KIND = "k8s.peer.workload.kind";
const std::string DEFAULT_TRACE_TAG_K8S_WORKLOAD_NAME = "k8s.workload.name";
const std::string DEFAULT_TRACE_TAG_K8S_PEER_WORKLOAD_NAME = "k8s.peer.workload.name";
const std::string DEFAULT_TRACE_TAG_K8S_SERVICE_NAME = "k8s.service.name";
const std::string DEFAULT_TRACE_TAG_K8S_PEER_SERVICE_NAME = "k8s.peer.service.name";
const std::string DEFAULT_TRACE_TAG_HOST_NAME = "host.name";
const std::string DEFAULT_TRACE_TAG_HOST_IP = "host.ip";
const std::string DEFAULT_TRACE_TAG_PROCESS_PID = "process.pid";
const std::string DEFAULT_TRACE_TAG_CONTAINER_NAME = "container.name";
const std::string DEFAULT_TRACE_TAG_CONTAINER_ID = "container.id";
const std::string DEFAULT_TRACE_TAG_IMAGE_NAME = "container.image.name";


} // namespace logtail
} // namespace logtail
22 changes: 20 additions & 2 deletions core/constants/TagConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ namespace logtail {

////////////////////////// TRACE ////////////////////////

extern const std::string DEFAULT_TRACE_TAG_K8S_NAMESPACE;
extern const std::string DEFAULT_TRACE_TAG_K8S_PEER_NAMESPACE;
extern const std::string DEFAULT_TRACE_TAG_K8S_POD_NAME;
extern const std::string DEFAULT_TRACE_TAG_K8S_PEER_POD_NAME;
extern const std::string DEFAULT_TRACE_TAG_K8S_POD_UID;
extern const std::string DEFAULT_TRACE_TAG_K8S_POD_IP;
extern const std::string DEFAULT_TRACE_TAG_K8S_PEER_POD_IP;
extern const std::string DEFAULT_TRACE_TAG_K8S_WORKLOAD_KIND;
extern const std::string DEFAULT_TRACE_TAG_K8S_PEER_WORKLOAD_KIND;
extern const std::string DEFAULT_TRACE_TAG_K8S_WORKLOAD_NAME;
extern const std::string DEFAULT_TRACE_TAG_K8S_PEER_WORKLOAD_NAME;
extern const std::string DEFAULT_TRACE_TAG_K8S_SERVICE_NAME;
extern const std::string DEFAULT_TRACE_TAG_K8S_PEER_SERVICE_NAME;
extern const std::string DEFAULT_TRACE_TAG_HOST_NAME;
extern const std::string DEFAULT_TRACE_TAG_HOST_IP;
extern const std::string DEFAULT_TRACE_TAG_PROCESS_PID;
extern const std::string DEFAULT_TRACE_TAG_CONTAINER_NAME;
extern const std::string DEFAULT_TRACE_TAG_CONTAINER_ID;
extern const std::string DEFAULT_TRACE_TAG_IMAGE_NAME;


} // namespace logtail
} // namespace logtail
3 changes: 3 additions & 0 deletions core/ebpf/SelfMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ void NetworkObserverSelfMonitor::HandleStatistic(nami::eBPFStatistics& stats) {
// recv kernel events metric
assert(stats.plugin_type_ == nami::PluginType::NETWORK_OBSERVE);
nami::NetworkObserverStatistics* currNetworkStatsPtr = static_cast<nami::NetworkObserverStatistics*>(&stats);
if (currNetworkStatsPtr == nullptr) {
return;
}

mRecvConnStatsTotal->Add(currNetworkStatsPtr->recv_conn_stat_events_total_);
mRecvCtrlEventsTotal->Add(currNetworkStatsPtr->recv_ctrl_events_total_);
Expand Down
4 changes: 2 additions & 2 deletions core/ebpf/SourceManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,14 @@ bool SourceManager::StartPlugin(nami::PluginType plugin_type, std::unique_ptr<na
return !res;
}

bool SourceManager::UpdatePlugin(nami::PluginType plugin_type, std::unique_ptr<nami::eBPFConfig> conf) {
bool SourceManager::UpdatePlugin(nami::PluginType plugin_type, std::unique_ptr<nami::eBPFConfig> conf, UpdataType update_type) {
if (!CheckPluginRunning(plugin_type)) {
LOG_ERROR(sLogger, ("plugin not started, type", int(plugin_type)));
return false;
}

LOG_INFO(sLogger, ("begin to update plugin, type", int(plugin_type)));
conf->type = UpdataType::SECURE_UPDATE_TYPE_CONFIG_CHAGE;
conf->type = update_type;
FillCommonConf(conf);
#ifdef APSARA_UNIT_TEST_MAIN
mConfig = std::move(conf);
Expand Down
4 changes: 3 additions & 1 deletion core/ebpf/SourceManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class SourceManager {
void Init();

bool StartPlugin(nami::PluginType plugin_type, std::unique_ptr<nami::eBPFConfig> conf);

bool UpdatePlugin(nami::PluginType plugin_type, std::unique_ptr<nami::eBPFConfig> conf,
UpdataType update_type = UpdataType::SECURE_UPDATE_TYPE_CONFIG_CHAGE);

bool StopPlugin(nami::PluginType plugin_type);

Expand All @@ -64,7 +67,6 @@ class SourceManager {
void FillCommonConf(std::unique_ptr<nami::eBPFConfig>& conf);
bool LoadDynamicLib(const std::string& lib_name);
bool DynamicLibSuccess();
bool UpdatePlugin(nami::PluginType plugin_type, std::unique_ptr<nami::eBPFConfig> conf);

enum class ebpf_func {
EBPF_INIT,
Expand Down
22 changes: 22 additions & 0 deletions core/ebpf/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,28 @@ bool InitObserverNetworkOptionInner(const Json::Value& probeConfig,
mContext->GetLogstoreName(),
mContext->GetRegion());
}
// EnableMetric (Optional)
if (!GetOptionalBoolParam(probeConfig, "EnableCidFilter", thisObserverNetworkOption.mEnableCidFilter, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
// EnableMetric (Optional)
if (!GetOptionalListParam<std::string>(probeConfig, "EnableCids", thisObserverNetworkOption.mEnableCids, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
// MeterHandlerType (Optional)
if (!GetOptionalStringParam(probeConfig, "MeterHandlerType", thisObserverNetworkOption.mMeterHandlerType, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
Expand Down
127 changes: 127 additions & 0 deletions core/ebpf/eBPFServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <algorithm>
#include <gflags/gflags.h>

#include "metadata/K8sMetadata.h"
#include "app_config/AppConfig.h"
#include "ebpf/config.h"
#include "ebpf/eBPFServer.h"
Expand Down Expand Up @@ -166,6 +167,10 @@ void eBPFServer::Init() {
auto configJson = AppConfig::GetInstance()->GetConfig();
mAdminConfig.LoadEbpfConfig(configJson);
mEventCB = std::make_unique<EventHandler>(nullptr, -1, 0);
mHostMetadataCB = std::make_unique<HostMetadataHandler>(nullptr, -1, 0);
mHostMetadataCB->RegisterUpdatePluginCallback([&](nami::PluginType type, UpdataType updateType, const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> ops) {
return UpdatePlugin(type, updateType, ops);
});
#ifdef __ENTERPRISE__
mMeterCB = std::make_unique<ArmsMeterHandler>(nullptr, -1, 0);
mSpanCB = std::make_unique<ArmsSpanHandler>(nullptr, -1, 0);
Expand Down Expand Up @@ -194,6 +199,7 @@ void eBPFServer::Stop() {
if (mEventCB) mEventCB->UpdateContext(nullptr, -1, -1);
if (mMeterCB) mMeterCB->UpdateContext(nullptr, -1, -1);
if (mSpanCB) mSpanCB->UpdateContext(nullptr,-1, -1);
if (mHostMetadataCB) mHostMetadataCB->UpdateContext(nullptr, -1, -1);
if (mNetworkSecureCB) mNetworkSecureCB->UpdateContext(nullptr,-1, -1);
if (mProcessSecureCB) mProcessSecureCB->UpdateContext(nullptr,-1, -1);
if (mFileSecureCB) mFileSecureCB->UpdateContext(nullptr, -1, -1);
Expand Down Expand Up @@ -259,9 +265,98 @@ bool eBPFServer::StartPluginInternal(const std::string& pipeline_name,
nconfig.enable_event_ = true;
mEventCB->UpdateContext(ctx, ctx->GetProcessQueueKey(), plugin_index);
}
nconfig.enable_cid_filter = opts->mEnableCidFilter;
nconfig.enable_container_ids_ = opts->mEnableCids;
mHostMetadataCB->UpdateContext(ctx, ctx->GetProcessQueueKey(), plugin_index);
K8sMetadata::GetInstance().ResiterHostMetadataCallback(plugin_index, [this](uint32_t pluginIdx, std::vector<std::string>& cids) { return mHostMetadataCB->handle(pluginIdx, cids); });

// K8s env check
nconfig.metadata_by_cid_cb_ = [&](std::vector<std::string>& cidVec, std::vector<std::unique_ptr<nami::PodMeta>>& metaVec) {
// K8sMetadata::GetInstance().GetInfoByContainerIdFromCache();
if (cidVec.size() != metaVec.size()) {
return false;
}
bool res;
auto metas = K8sMetadata::GetInstance().SyncGetPodMetadataByContainerIds(cidVec, res);
if (!res) return false;
for (size_t i = 0; i < cidVec.size(); i ++) {
if (metas[i] != nullptr) {
metaVec[i] = std::make_unique<nami::PodMeta>(metas[i]->appId, metas[i]->appName, metas[i]->k8sNamespace, metas[i]->workloadName, metas[i]->workloadKind, metas[i]->podName, metas[i]->podIp, metas[i]->serviceName);
} else {
metaVec[i] = nullptr;
}
}
return true;
};
nconfig.metadata_by_ip_cb_ = [&](std::vector<std::string>& ipVec, std::vector<std::unique_ptr<nami::PodMeta>>& metaVec) {
if (ipVec.size() != metaVec.size()) return false;
bool res;
std::vector<std::shared_ptr<k8sContainerInfo>> metas = K8sMetadata::GetInstance().SyncGetPodMetadataByIps(ipVec, res);
if (!res) return false;
for (size_t i = 0; i < ipVec.size(); i ++) {
if (metas[i] != nullptr) {
metaVec[i] = std::make_unique<nami::PodMeta>(metas[i]->appId, metas[i]->appName, metas[i]->k8sNamespace, metas[i]->workloadName, metas[i]->workloadKind, metas[i]->podName, metas[i]->podIp, metas[i]->serviceName);
} else {
metaVec[i] = nullptr;
}
}
return true;
};
nconfig.metadata_by_cid_cache_ = [&](const std::string& cid) -> std::unique_ptr<nami::PodMeta> {
auto info = K8sMetadata::GetInstance().GetInfoByContainerIdFromCache(cid);
LOG_DEBUG(sLogger,
("cid", cid)
("isNull", info == nullptr)
("appId", info == nullptr ? "null" : info->appId)
("podName", info == nullptr ? "null" : info->podName)
("podIp", info == nullptr ? "null" : info->podIp)
("workloadKind", info == nullptr ? "null" : info->workloadKind)
("workloadName", info == nullptr ? "null" : info->workloadName)
("serviceName", info == nullptr ? "null" : info->serviceName)
);
if (info) {
return std::make_unique<nami::PodMeta>(
info->appId,
info->appName,
info->k8sNamespace,
info->workloadName,
info->workloadKind,
info->podName,
info->podIp,
info->serviceName);
}
return nullptr;
};
nconfig.metadata_by_ip_cache_ = [&](const std::string& ip) -> std::unique_ptr<nami::PodMeta> {
auto info = K8sMetadata::GetInstance().GetInfoByIpFromCache(ip);
LOG_DEBUG(sLogger,
("ip", ip)
("isNull", info == nullptr)
("appId", info == nullptr ? "null" : info->appId)
("podName", info == nullptr ? "null" : info->podName)
("podIp", info == nullptr ? "null" : info->podIp)
("workloadKind", info == nullptr ? "null" : info->workloadKind)
("workloadName", info == nullptr ? "null" : info->workloadName)
("serviceName", info == nullptr ? "null" : info->serviceName)
);
if (info) {
return std::make_unique<nami::PodMeta>(
info->appId,
info->appName,
info->k8sNamespace,
info->workloadName,
info->workloadKind,
info->podName,
info->podIp,
info->serviceName);
}
return nullptr;
};

eBPFConfig->config_ = std::move(nconfig);
ret = mSourceManager->StartPlugin(type, std::move(eBPFConfig));
// TODO @qianlu.kk check env first ...
K8sMetadata::GetInstance().StartFetchHostMetadata();
break;
}

Expand Down Expand Up @@ -300,6 +395,31 @@ bool eBPFServer::StartPluginInternal(const std::string& pipeline_name,
return ret;
}

bool eBPFServer::UpdatePlugin(nami::PluginType type, UpdataType updateType, const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options) {
auto eBPFConfig = std::make_unique<nami::eBPFConfig>();
eBPFConfig->plugin_type_ = type;
switch (type)
{
case nami::PluginType::NETWORK_OBSERVE:{
nami::NetworkObserveConfig nconfig;
nami::ObserverNetworkOption* opts = std::get<nami::ObserverNetworkOption*>(options);
nconfig.enable_cid_filter = true;
nconfig.enable_container_ids_ = opts->mEnableCids;
nconfig.disable_container_ids_ = opts->mDisableCids;
eBPFConfig->config_ = nconfig;
eBPFConfig->type = updateType;
LOG_DEBUG(sLogger,
("enable_container_ids_ size", nconfig.enable_container_ids_.size())
("disable_container_ids_ size", nconfig.disable_container_ids_.size()));
break;
}
default:
LOG_ERROR(sLogger, (std::to_string(int(type)), " not support to update plugin ..."));
break;
}
return mSourceManager->UpdatePlugin(type, std::move(eBPFConfig), updateType);
}

bool eBPFServer::HasRegisteredPlugins() const {
std::lock_guard<std::mutex> lk(mMtx);
for (auto& pipeline : mLoadedPipeline) {
Expand Down Expand Up @@ -329,6 +449,11 @@ bool eBPFServer::DisablePlugin(const std::string& pipeline_name, nami::PluginTyp
LOG_WARNING(sLogger, ("prev pipeline", prev_pipeline)("curr pipeline", pipeline_name));
return true;
}
mMonitorMgr->Release(type);
if (type == nami::PluginType::NETWORK_OBSERVE) {
// TODO @qianlu.kk check env first
K8sMetadata::GetInstance().StopFetchHostMetadata();
}
bool ret = mSourceManager->StopPlugin(type);
// UpdateContext must after than StopPlugin
if (ret) {
Expand Down Expand Up @@ -372,6 +497,7 @@ bool eBPFServer::SuspendPlugin(const std::string& pipeline_name, nami::PluginTyp
UpdateCBContext(type, nullptr, -1, -1);
mSuspendPluginTotal->Add(1);
}
mMonitorMgr->Suspend(type);
return ret;
}

Expand All @@ -385,6 +511,7 @@ void eBPFServer::UpdateCBContext(nami::PluginType type, const logtail::PipelineC
if (mMeterCB) mMeterCB->UpdateContext(ctx, key, idx);
if (mSpanCB) mSpanCB->UpdateContext(ctx, key, idx);
if (mEventCB) mEventCB->UpdateContext(ctx, key, idx);
if (mHostMetadataCB) mHostMetadataCB->UpdateContext(ctx, key, idx);
return;
}
case nami::PluginType::NETWORK_SECURITY:{
Expand Down
3 changes: 3 additions & 0 deletions core/ebpf/eBPFServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class eBPFServer : public InputRunner {

bool DisablePlugin(const std::string& pipeline_name, nami::PluginType type);

bool UpdatePlugin(nami::PluginType type, UpdataType updateType, const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options);

bool SuspendPlugin(const std::string& pipeline_name, nami::PluginType type);

bool HasRegisteredPlugins() const override;
Expand All @@ -99,6 +101,7 @@ class eBPFServer : public InputRunner {
std::unique_ptr<EventHandler> mEventCB;
std::unique_ptr<MeterHandler> mMeterCB;
std::unique_ptr<SpanHandler> mSpanCB;
std::unique_ptr<HostMetadataHandler> mHostMetadataCB;
std::unique_ptr<SecurityHandler> mNetworkSecureCB;
std::unique_ptr<SecurityHandler> mProcessSecureCB;
std::unique_ptr<SecurityHandler> mFileSecureCB;
Expand Down
2 changes: 2 additions & 0 deletions core/ebpf/handler/AbstractHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class AbstractHandler {
AbstractHandler(const logtail::PipelineContext* ctx, logtail::QueueKey key, uint32_t idx)
: mCtx(ctx), mQueueKey(key), mPluginIdx(idx) {}
void UpdateContext(const logtail::PipelineContext* ctx, logtail::QueueKey key, uint32_t index) {
WriteLock lk(mCtxLock);
mCtx = ctx;
mQueueKey = key;
mPluginIdx = index;
Expand All @@ -38,6 +39,7 @@ class AbstractHandler {
logtail::QueueKey mQueueKey = 0;
uint64_t mProcessTotalCnt = 0;
uint32_t mPluginIdx = 0;
ReadWriteLock mCtxLock;

#ifdef APSARA_UNIT_TEST_MAIN
friend class eBPFServerUnittest;
Expand Down
Loading
Loading