Skip to content

Commit

Permalink
Merge branch 'alibaba:main' into feature/tag
Browse files Browse the repository at this point in the history
  • Loading branch information
NameHaibinZhang authored Nov 11, 2024
2 parents a85729d + fd95ac6 commit 9b01b5f
Show file tree
Hide file tree
Showing 62 changed files with 1,391 additions and 2,028 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-core-ut.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
run: make unittest_core

- name: Unit Test Coverage
run: docker build -t unittest_coverage -f ./docker/Dockerfile_coverage . && docker run -v $(pwd):$(pwd) unittest_coverage bash -c "cd $(pwd)/core && gcovr --gcov-ignore-errors=no_working_dir_found --root . --json coverage.json --json-summary-pretty --json-summary summary.json -e \".*sdk.*\" -e \".*logger.*\" -e \".*unittest.*\" -e \".*config_server.*\" -e \".*go_pipeline.*\" -e \".*application.*\" -e \".*protobuf.*\" -e \".*runner.*\""
run: docker build -t unittest_coverage -f ./docker/Dockerfile_coverage . && docker run -v $(pwd):$(pwd) unittest_coverage bash -c "cd $(pwd)/core && gcovr --gcov-ignore-errors=no_working_dir_found --root . --json coverage.json --json-summary-pretty --json-summary summary.json -e \".*\.pb\.cc\" -e \".*\.pb\.h\" -e \".*unittest.*\" -e \".*sdk.*\" -e \".*logger.*\" -e \".*config_server.*\" -e \".*go_pipeline.*\" -e \".*application.*\" -e \".*runner.*\""

- name: Setup Python3.10
uses: actions/setup-python@v5
Expand Down
13 changes: 12 additions & 1 deletion config_server/protocol/v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@
string name = 1; // Required, Config's unique identification
int64 version = 2; // Required, Config's version number or hash code
ConfigStatus status = 3; // Config's status
string message = 4; // Optional error message
map<string, bytes> extra = 5; // Optional extra info
}

// Define the Command information carried in the request
message CommandInfo {
string type = 1; // Command's type
string name = 2; // Required, Command's unique identification
ConfigStatus status = 3; // Command's status
string message = 4; // Optional error message
map<string, bytes> extra = 5; // Optional extra info
}

// Define Agent's basic attributes
Expand Down Expand Up @@ -110,13 +114,15 @@
string name = 1; // Required, Config's unique identification
int64 version = 2; // Required, Config's version number or hash code
bytes detail = 3; // Required, Config's detail
map<string, bytes> extra = 4; // Optional extra info
}

message CommandDetail {
string type = 1; // Required, Command type
string name = 2; // Required, Command name
bytes detail = 3; // Required, Command's detail
int64 expire_time = 4; // After which the command can be safely removed from history
map<string, bytes> extra = 5; // Optional extra info
}

enum ServerCapabilities {
Expand Down Expand Up @@ -168,7 +174,7 @@ Server:应当通过capbilitiies上报Server自身的能力,这样如果新

Client:Agent启动后第一次向Server汇报全量信息,request字段应填尽填。request\_id、sequence\_num、capabilities、instance\_id、agent\_type、startup\_time为必填字段。

Server:Server根据上报的信息返回响应。pipeline\_config\_updates、instance\_config\_updates中包含agent需要同步的配置,updates中必然包含name和version,是否包含详情context和detail取决于server端实现。custom\_command_updates包含要求agent执行的命令command中必然包含type、name和expire\_time。
Server:Server根据上报的信息返回响应。pipeline\_config\_updates、instance\_config\_updates中包含agent需要同步的配置,updates中必然包含name和version,是否包含detail取决于server端实现。custom\_command_updates包含要求agent执行的命令command中必然包含type、name和expire\_time。

Server是否保存Client信息取决于Server实现,如果服务端找不到或保存的sequence\_num + 1 ≠ 心跳的sequence\_num,那么就立刻返回并且flags中必须设置ReportFullStatus标识位。

Expand Down Expand Up @@ -249,3 +255,8 @@ Server: 如果上报+已知的Agent状态中,缺少应下发的custom\_comman
Server: 服务端正常返回时HeartbeatResponse中的code应始终设置为0,而当服务端异常时,必须将HeartbeatResponse中的code设置为非0,HeartbeatResponse中的message应包含错误信息,此时Response中的其他字段必须为空。

Client: 当HeartbeatResponse中的code为0时,Agent应该正常处理下发的配置。当HeartbeatResponse中的code不为0时,Agent必须忽略除code和message外的其他字段,并择机重试。

### 辅助信息
在command\_info, command\_detail, config\_info, config\_detail中,都预留了extra字段,可以用于传递一些额外的用户自定义的辅助信息。\

注意:extra字段仅作传递辅助信息使用,不会对管控行为造成任何影响。
4 changes: 4 additions & 0 deletions config_server/protocol/v2/agentV2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ message ConfigInfo {
int64 version = 2; // Required, Config's version number or hash code
ConfigStatus status = 3; // Config's status
string message = 4; // Optional error message
map<string, bytes> extra = 5; // Optional extra info
}

// Define the Command information carried in the request
Expand All @@ -33,6 +34,7 @@ message CommandInfo {
string name = 2; // Required, Command's unique identification
ConfigStatus status = 3; // Command's status
string message = 4; // Optional error message
map<string, bytes> extra = 5; // Optional extra info
}

// Define Agent's basic attributes
Expand Down Expand Up @@ -94,13 +96,15 @@ message ConfigDetail {
string name = 1; // Required, Config's unique identification
int64 version = 2; // Required, Config's version number or hash code
bytes detail = 3; // Required, Config's detail
map<string, bytes> extra = 4; // Optional extra info
}

message CommandDetail {
string type = 1; // Required, Command type
string name = 2; // Required, Command name
bytes detail = 3; // Required, Command's detail
int64 expire_time = 4; // After which the command can be safely removed from history
map<string, bytes> extra = 5; // Optional extra info
}

enum ServerCapabilities {
Expand Down
1 change: 0 additions & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ endif()
# remove several files in go_pipeline
list(REMOVE_ITEM FRAMEWORK_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/go_pipeline/LogtailPluginAdapter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/go_pipeline/LogtailPluginAdapter.h)


# add provider
add_subdirectory("${PROVIDER_PATH}" "${CMAKE_BINARY_DIR}/provider")

Expand Down
8 changes: 6 additions & 2 deletions core/file_server/event/BlockEventManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "common/Flags.h"
#include "common/HashUtil.h"
#include "common/StringTools.h"
#include "file_server/event_handler/LogInput.h"
#include "logger/Logger.h"
#include "pipeline/queue/ProcessQueueManager.h"

Expand Down Expand Up @@ -69,8 +70,11 @@ BlockedEventManager::~BlockedEventManager() {
}

void BlockedEventManager::Feedback(int64_t key) {
lock_guard<mutex> lock(mFeedbackQueueMux);
mFeedbackQueue.emplace_back(key);
{
lock_guard<mutex> lock(mFeedbackQueueMux);
mFeedbackQueue.emplace_back(key);
}
LogInput::GetInstance()->Trigger();
}

void BlockedEventManager::UpdateBlockEvent(
Expand Down
40 changes: 20 additions & 20 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ void LogInput::Start() {
mInteruptFlag = false;

mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
mRegisterdHandlersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL);
mActiveReadersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);
mRegisterdHandlersTotal
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL);
mActiveReadersTotal
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(
METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);

new Thread([this]() { ProcessLoop(); });
}
Expand Down Expand Up @@ -118,19 +121,14 @@ void LogInput::TryReadEvents(bool forceRead) {
if (mInteruptFlag)
return;

if (!forceRead) {
int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds();
if (curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval))
mLastReadEventMicroSeconds = curMicroSeconds;
else
return;
} else
mLastReadEventMicroSeconds = GetCurrentTimeInMicroSeconds();

vector<Event*> inotifyEvents;
EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents);
if (inotifyEvents.size() > 0) {
PushEventQueue(inotifyEvents);
int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds();
if (forceRead || curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval)) {
vector<Event*> inotifyEvents;
EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents);
if (inotifyEvents.size() > 0) {
PushEventQueue(inotifyEvents);
}
mLastReadEventMicroSeconds = curMicroSeconds;
}

vector<Event*> feedbackEvents;
Expand Down Expand Up @@ -212,8 +210,7 @@ bool LogInput::ReadLocalEvents() {
}
// set discard old data flag, so that history data will not be dropped.
BOOL_FLAG(ilogtail_discard_old_data) = false;
LOG_INFO(sLogger,
("load local events", GetLocalEventDataFileName())("event count", localEventJson.size()));
LOG_INFO(sLogger, ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size()));
for (Json::ValueIterator iter = localEventJson.begin(); iter != localEventJson.end(); ++iter) {
const Json::Value& eventItem = *iter;
if (!eventItem.isObject()) {
Expand Down Expand Up @@ -395,8 +392,11 @@ void* LogInput::ProcessLoop() {
delete ev;
else
ProcessEvent(dispatcher, ev);
} else
usleep(INT32_FLAG(log_input_thread_wait_interval));
} else {
unique_lock<mutex> lock(mFeedbackMux);
mFeedbackCV.wait_for(lock, chrono::microseconds(INT32_FLAG(log_input_thread_wait_interval)));
}

if (mIdleFlag)
continue;

Expand Down
5 changes: 5 additions & 0 deletions core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class LogInput : public LogRunnable {

int32_t GetLastReadEventTime() { return mLastReadEventTime; }

void Trigger() { mFeedbackCV.notify_one(); }

private:
LogInput();
~LogInput();
Expand Down Expand Up @@ -89,6 +91,9 @@ class LogInput : public LogRunnable {
mutable std::mutex mThreadRunningMux;
mutable std::condition_variable mStopCV;

mutable std::mutex mFeedbackMux;
mutable std::condition_variable mFeedbackCV;

#ifdef APSARA_UNIT_TEST_MAIN
friend class LogInputUnittest;
friend class EventDispatcherTest;
Expand Down
21 changes: 20 additions & 1 deletion core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
#include "app_config/AppConfig.h"
#include "checkpoint/CheckPointManager.h"
#include "checkpoint/CheckpointManagerV2.h"
#include "constants/Constants.h"
#include "common/ErrorUtil.h"
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/HashUtil.h"
#include "common/RandomUtil.h"
#include "common/TimeUtil.h"
#include "common/UUIDUtil.h"
#include "constants/Constants.h"
#include "file_server/ConfigManager.h"
#include "file_server/FileServer.h"
#include "file_server/event/BlockEventManager.h"
Expand Down Expand Up @@ -73,6 +73,12 @@ DEFINE_FLAG_INT32(max_fix_pos_bytes, "", 128 * 1024);
DEFINE_FLAG_INT32(force_release_deleted_file_fd_timeout,
"force release fd if file is deleted after specified seconds, no matter read to end or not",
-1);
#if defined(_MSC_VER)
// On Windows, if Chinese config base path is used, the log path will be converted to GBK,
// so the __tag__.__path__ have to be converted back to UTF8 to avoid bad display.
// Note: enable this will spend CPU to do transformation.
DEFINE_FLAG_BOOL(enable_chinese_tag_path, "Enable Chinese __tag__.__path__", true);
#endif
DECLARE_FLAG_INT32(reader_close_unused_file_time);
DECLARE_FLAG_INT32(logtail_alarm_interval);

Expand Down Expand Up @@ -2480,6 +2486,19 @@ PipelineEventGroup LogFileReader::GenerateEventGroup(LogFileReaderPtr reader, Lo
return group;
}

const std::string& LogFileReader::GetConvertedPath() const {
const std::string& path = mDockerPath.empty() ? mHostLogPath : mDockerPath;
#if defined(_MSC_VER)
if (BOOL_FLAG(enable_chinese_tag_path)) {
static std::string newPath = EncodingConverter::GetInstance()->FromACPToUTF8(path);
return newPath;
}
return path;
#else
return path;
#endif
}

#ifdef APSARA_UNIT_TEST_MAIN
void LogFileReader::UpdateReaderManual() {
if (mLogFileOp.IsOpen()) {
Expand Down
4 changes: 2 additions & 2 deletions core/file_server/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ class LogFileReader {
bool GetSymbolicLinkFlag() const { return mSymbolicLinkFlag; }

/// @return e.g. `/home/admin/access.log`
const std::string& GetConvertedPath() const { return mDockerPath.empty() ? mHostLogPath : mDockerPath; }

const std::string& GetConvertedPath() const;
const std::string& GetHostLogPathFile() const { return mHostLogPathFile; }

int64_t GetFileSize() const { return mLastFileSize; }
Expand Down
30 changes: 17 additions & 13 deletions core/models/EventPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,26 @@ EventPool::~EventPool() {
}

LogEvent* EventPool::AcquireLogEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mLogEventPool, mMinUnusedLogEventsCnt);
}
return AcquireEventNoLock(ptr, mLogEventPool, mMinUnusedLogEventsCnt);
}

MetricEvent* EventPool::AcquireMetricEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mMetricEventPool, mMinUnusedMetricEventsCnt);
}
return AcquireEventNoLock(ptr, mMetricEventPool, mMinUnusedMetricEventsCnt);
}

SpanEvent* EventPool::AcquireSpanEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mSpanEventPool, mMinUnusedSpanEventsCnt);
}
Expand Down Expand Up @@ -98,20 +95,27 @@ void EventPool::Release(vector<SpanEvent*>&& obj) {
}

template <class T>
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux) {
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux, const string& type) {
if (minUnusedCnt <= pool.size() || minUnusedCnt == numeric_limits<size_t>::max()) {
auto sz = minUnusedCnt == numeric_limits<size_t>::max() ? pool.size() : minUnusedCnt;
for (size_t i = 0; i < sz; ++i) {
delete pool.back();
pool.pop_back();
}
size_t bakSZ = 0;
if (mux) {
lock_guard<mutex> lock(*mux);
bakSZ = poolBak.size();
for (auto& item : poolBak) {
delete item;
}
poolBak.clear();
}
if (sz != 0 || bakSZ != 0) {
LOG_INFO(
sLogger,
("event pool gc", "done")("event type", type)("gc event cnt", sz + bakSZ)("pool size", pool.size()));
}
} else {
LOG_ERROR(sLogger,
("unexpected error", "min unused event cnt is greater than pool size")(
Expand All @@ -124,13 +128,13 @@ void EventPool::CheckGC() {
if (time(nullptr) - mLastGCTime > INT32_FLAG(event_pool_gc_interval_secs)) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolMux);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux, "span");
} else {
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr, "span");
}
mLastGCTime = time(nullptr);
}
Expand Down
16 changes: 15 additions & 1 deletion core/models/PipelineEventGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,24 @@ namespace logtail {
template <class T>
void DestroyEvents(vector<PipelineEventPtr>&& events) {
unordered_map<EventPool*, vector<T*>> eventsPoolMap;
// for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency
EventPool* cachedPoolPtr = nullptr;
typename unordered_map<EventPool*, vector<T*>>::iterator cachedIt;
bool firstEvent = true;
for (auto& item : events) {
if (item && item.IsFromEventPool()) {
item->Reset();
eventsPoolMap[item.GetEventPool()].emplace_back(static_cast<T*>(item.Release()));
if (firstEvent || item.GetEventPool() != cachedPoolPtr) {
cachedPoolPtr = item.GetEventPool();
cachedIt = eventsPoolMap.find(cachedPoolPtr);
if (cachedIt == eventsPoolMap.end()) {
eventsPoolMap.emplace(cachedPoolPtr, vector<T*>());
cachedIt = eventsPoolMap.find(cachedPoolPtr);
cachedIt->second.reserve(events.size());
}
firstEvent = false;
}
cachedIt->second.emplace_back(static_cast<T*>(item.Release()));
}
}
for (auto& item : eventsPoolMap) {
Expand Down
Loading

0 comments on commit 9b01b5f

Please sign in to comment.