diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 8613450034..c81d8e6954 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -115,6 +115,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake) set(SUB_DIRECTORIES_LIST application app_config checkpoint container_manager logger go_pipeline monitor monitor/metric_constants profile_sender models config config/watcher + instance_config pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer runner runner/sink/http protobuf/sls diff --git a/core/app_config/AppConfig.cpp b/core/app_config/AppConfig.cpp index e0390e5b29..47eed49be9 100644 --- a/core/app_config/AppConfig.cpp +++ b/core/app_config/AppConfig.cpp @@ -16,16 +16,19 @@ #include #include +#include #include +#include #include "RuntimeUtil.h" #include "common/EnvUtil.h" #include "common/FileSystemUtil.h" #include "common/JsonUtil.h" #include "common/LogtailCommonFlags.h" -#include "common/RuntimeUtil.h" +#include "config/watcher/InstanceConfigWatcher.h" #include "file_server/ConfigManager.h" #include "file_server/reader/LogFileReader.h" +#include "json/value.h" #include "logger/Logger.h" #include "monitor/LogFileProfiler.h" #include "monitor/LogtailAlarm.h" @@ -36,9 +39,9 @@ using namespace std; +DEFINE_FLAG_BOOL(logtail_mode, "logtail mode", false); DEFINE_FLAG_INT32(max_buffer_num, "max size", 40); DEFINE_FLAG_INT32(pub_max_buffer_num, "max size", 8); -DEFINE_FLAG_INT32(default_max_send_byte_per_sec, "the max send speed per sec, realtime thread", 25 * 1024 * 1024); DEFINE_FLAG_INT32(pub_max_send_byte_per_sec, "the max send speed per sec, realtime thread", 20 * 1024 * 1024); DEFINE_FLAG_INT32(default_send_byte_per_sec, "the max send speed per sec, replay buffer thread", 2 * 1024 * 1024); DEFINE_FLAG_INT32(pub_send_byte_per_sec, "the max send speed per sec, replay buffer thread", 1 * 1024 * 1024); @@ -48,8 +51,6 @@ DEFINE_FLAG_INT32(default_local_file_size, "default size of one buffer file", 20 DEFINE_FLAG_INT32(pub_local_file_size, "default size of one buffer file", 20 * 1024 * 1024); DEFINE_FLAG_INT32(process_thread_count, "", 1); DEFINE_FLAG_INT32(send_request_concurrency, "max count keep in mem when async send", 10); -DEFINE_FLAG_BOOL(enable_send_tps_smoothing, "avoid web server load burst", true); -DEFINE_FLAG_BOOL(enable_flow_control, "if enable flow control", true); DEFINE_FLAG_STRING(default_buffer_file_path, "set current execution dir in default", ""); DEFINE_FLAG_STRING(buffer_file_path, "set buffer dir", ""); // DEFINE_FLAG_STRING(default_mapping_config_path, "", "mapping_config.json"); @@ -178,12 +179,26 @@ DEFINE_FLAG_STRING(check_point_filename, "", "/tmp/logtail_check_point"); DEFINE_FLAG_STRING(check_point_filename, "", "C:\\LogtailData\\logtail_check_point"); #endif +DEFINE_FLAG_STRING(sls_observer_ebpf_host_path, + "the backup real host path for store libebpf.so", + "/etc/ilogtail/ebpf/"); + namespace logtail { +constexpr int32_t kDefaultMaxSendBytePerSec = 25 * 1024 * 1024; // the max send speed per sec, realtime thread -void CreateAgentDir () { -#if defined(__RUN_LOGTAIL__) - return; -#endif +std::string AppConfig::sLocalConfigDir = "local"; +void CreateAgentDir() { + try { + const char* value = getenv("logtail_mode"); + if (value != NULL) { + STRING_FLAG(logtail_mode) = StringTo(value); + } + } catch (const exception& e) { + std::cout << "load config from env error, env_name:logtail_mode, error:" << e.what() << std::endl; + } + if (BOOL_FLAG(logtail_mode)) { + return; + } std::string processExecutionDir = GetProcessExecutionDir(); Json::Value emptyJson; #define PROCESSDIRFLAG(flag_name) \ @@ -221,11 +236,11 @@ std::string GetAgentThirdPartyDir() { if (!dir.empty()) { return dir; } -#if defined(__RUN_LOGTAIL__) - dir = AppConfig::GetInstance()->GetLoongcollectorConfDir(); -#else - dir = STRING_FLAG(loongcollector_third_party_dir) + PATH_SEPARATOR; -#endif + if (BOOL_FLAG(logtail_mode)) { + dir = AppConfig::GetInstance()->GetLoongcollectorConfDir(); + } else { + dir = STRING_FLAG(loongcollector_third_party_dir) + PATH_SEPARATOR; + } return dir; } @@ -234,10 +249,14 @@ std::string GetAgentLogDir() { if (!dir.empty()) { return dir; } -#if defined(__RUN_LOGTAIL__) || defined(APSARA_UNIT_TEST_MAIN) +#if defined(APSARA_UNIT_TEST_MAIN) dir = GetProcessExecutionDir(); #else - dir = STRING_FLAG(loongcollector_log_dir) + PATH_SEPARATOR; + if (BOOL_FLAG(logtail_mode)) { + dir = GetProcessExecutionDir(); + } else { + dir = STRING_FLAG(loongcollector_log_dir) + PATH_SEPARATOR; + } #endif return dir; } @@ -247,10 +266,14 @@ std::string GetAgentDataDir() { if (!dir.empty()) { return dir; } -#if defined(__RUN_LOGTAIL__) || defined(APSARA_UNIT_TEST_MAIN) +#if defined(APSARA_UNIT_TEST_MAIN) dir = GetProcessExecutionDir(); #else - dir = STRING_FLAG(loongcollector_data_dir) + PATH_SEPARATOR; + if (BOOL_FLAG(logtail_mode)) { + dir = GetProcessExecutionDir(); + } else { + dir = STRING_FLAG(loongcollector_data_dir) + PATH_SEPARATOR; + } #endif return dir; } @@ -260,10 +283,14 @@ std::string GetAgentConfDir() { if (!dir.empty()) { return dir; } -#if defined(__RUN_LOGTAIL__) || defined(APSARA_UNIT_TEST_MAIN) +#if defined(APSARA_UNIT_TEST_MAIN) dir = GetProcessExecutionDir(); #else - dir = STRING_FLAG(loongcollector_conf_dir) + PATH_SEPARATOR; + if (BOOL_FLAG(logtail_mode)) { + dir = GetProcessExecutionDir(); + } else { + dir = STRING_FLAG(loongcollector_conf_dir) + PATH_SEPARATOR; + } #endif return dir; } @@ -273,10 +300,14 @@ std::string GetAgentRunDir() { if (!dir.empty()) { return dir; } -#if defined(__RUN_LOGTAIL__) || defined(APSARA_UNIT_TEST_MAIN) +#if defined(APSARA_UNIT_TEST_MAIN) dir = GetProcessExecutionDir(); #else - dir = STRING_FLAG(loongcollector_run_dir) + PATH_SEPARATOR; + if (BOOL_FLAG(logtail_mode)) { + dir = GetProcessExecutionDir(); + } else { + dir = STRING_FLAG(loongcollector_run_dir) + PATH_SEPARATOR; + } #endif return dir; } @@ -286,42 +317,42 @@ std::string GetAgentDockerPathConfig() { if (!file_path.empty()) { return file_path; } -#if defined(__RUN_LOGTAIL__) - file_path = GetAgentDataDir() + STRING_FLAG(ilogtail_docker_file_path_config); -#else - file_path = GetAgentDataDir() + "docker_path_config.json"; -#endif + if (BOOL_FLAG(logtail_mode)) { + file_path = GetAgentDataDir() + STRING_FLAG(ilogtail_docker_file_path_config); + } else { + file_path = GetAgentDataDir() + "docker_path_config.json"; + } return file_path; } std::string GetAgentConfDir(const ParseConfResult& res, const Json::Value& confJson) { std::string newConfDir; -#if defined(__RUN_LOGTAIL__) - if (res == CONFIG_OK) { - // Should be loaded here because other parameters depend on it. - LoadStringParameter(newConfDir, confJson, "logtail_sys_conf_dir", "ALIYUN_LOGTAIL_SYS_CONF_DIR"); - } - if (newConfDir.empty()) { - newConfDir = STRING_FLAG(logtail_sys_conf_dir); + if (BOOL_FLAG(logtail_mode)) { + if (res == CONFIG_OK) { + // Should be loaded here because other parameters depend on it. + LoadStringParameter(newConfDir, confJson, "logtail_sys_conf_dir", "ALIYUN_LOGTAIL_SYS_CONF_DIR"); + } + if (newConfDir.empty()) { + newConfDir = STRING_FLAG(logtail_sys_conf_dir); + } + } else { + newConfDir = GetAgentConfDir(); } -#else - newConfDir = GetAgentConfDir(); -#endif return newConfDir; } std::string GetAgentConfigFile() { -#if defined(__RUN_LOGTAIL__) - // load ilogtail_config.json - char* configEnv = getenv(STRING_FLAG(ilogtail_config_env_name).c_str()); - if (configEnv == NULL || strlen(configEnv) == 0) { - return STRING_FLAG(ilogtail_config); + if (BOOL_FLAG(logtail_mode)) { + // load ilogtail_config.json + char* configEnv = getenv(STRING_FLAG(ilogtail_config_env_name).c_str()); + if (configEnv == NULL || strlen(configEnv) == 0) { + return STRING_FLAG(ilogtail_config); + } else { + return configEnv; + } } else { - return configEnv; + return LOONGCOLLECTOR_CONFIG; } -#else - return LOONGCOLLECTOR_CONFIG; -#endif } std::string GetAgentAppInfoFile() { @@ -329,141 +360,147 @@ std::string GetAgentAppInfoFile() { if (!file.empty()) { return file; } -#if defined(__RUN_LOGTAIL__) - file = GetAgentRunDir() + STRING_FLAG(app_info_file); -#else - file = GetAgentRunDir() + "app_info.json"; -#endif + if (BOOL_FLAG(logtail_mode)) { + file = GetAgentRunDir() + STRING_FLAG(app_info_file); + } else { + file = GetAgentRunDir() + "app_info.json"; + } return file; } string GetAdhocCheckpointDirPath() { -#if defined(__RUN_LOGTAIL__) - return STRING_FLAG(adhoc_check_point_file_dir); -#else - return GetAgentDataDir() + "adhoc_checkpoint"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return STRING_FLAG(adhoc_check_point_file_dir); + } else { + return GetAgentDataDir() + "adhoc_checkpoint"; + } } string GetCheckPointFileName() { -#if defined(__RUN_LOGTAIL__) - return STRING_FLAG(check_point_filename); -#else - return GetAgentDataDir() + "file_check_point"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return STRING_FLAG(check_point_filename); + } else { + return GetAgentDataDir() + "file_check_point"; + } } string GetCrashStackFileName() { -#if defined(__RUN_LOGTAIL__) - return GetProcessExecutionDir() + STRING_FLAG(crash_stack_file_name); -#else - return GetAgentDataDir() + "backtrace.dat"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return GetProcessExecutionDir() + STRING_FLAG(crash_stack_file_name); + } else { + return GetAgentDataDir() + "backtrace.dat"; + } } string GetLocalEventDataFileName() { -#if defined(__RUN_LOGTAIL__) - return STRING_FLAG(local_event_data_file_name); -#else - return AppConfig::GetInstance()->GetLoongcollectorConfDir() + "local_event.json"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return STRING_FLAG(local_event_data_file_name); + } else { + return AppConfig::GetInstance()->GetLoongcollectorConfDir() + "local_event.json"; + } } string GetInotifyWatcherDirsDumpFileName() { -#if defined(__RUN_LOGTAIL__) - return GetProcessExecutionDir() + STRING_FLAG(inotify_watcher_dirs_dump_filename); -#else - return GetAgentRunDir() + "inotify_watcher_dirs"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return GetProcessExecutionDir() + STRING_FLAG(inotify_watcher_dirs_dump_filename); + } else { + return GetAgentRunDir() + "inotify_watcher_dirs"; + } } string GetAgentLoggersPrefix() { -#if defined(__RUN_LOGTAIL__) - return "/apsara/sls/ilogtail"; -#else - return "/apsara/loongcollector"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return "/apsara/sls/ilogtail"; + } else { + return "/apsara/loongcollector"; + } } string GetAgentLogName() { -#if defined(__RUN_LOGTAIL__) - return "ilogtail.LOG"; -#else - return "loongcollector.LOG"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return "ilogtail.LOG"; + } else { + return "loongcollector.LOG"; + } } string GetAgentSnapshotDir() { -#if defined(__RUN_LOGTAIL__) - return GetProcessExecutionDir() + STRING_FLAG(logtail_snapshot_dir); -#else - return GetAgentLogDir() + "snapshot"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return GetProcessExecutionDir() + STRING_FLAG(logtail_snapshot_dir); + } else { + return GetAgentLogDir() + "snapshot"; + } } string GetAgentProfileLogName() { -#if defined(__RUN_LOGTAIL__) - return "ilogtail_profile.LOG"; -#else - return "loongcollector_profile.LOG"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return "ilogtail_profile.LOG"; + } else { + return "loongcollector_profile.LOG"; + } } string GetAgentStatusLogName() { -#if defined(__RUN_LOGTAIL__) - return "ilogtail_status.LOG"; -#else - return "loongcollector_status.LOG"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return "ilogtail_status.LOG"; + } else { + return "loongcollector_status.LOG"; + } } string GetProfileSnapshotDumpFileName() { -#if defined(__RUN_LOGTAIL__) - return GetProcessExecutionDir() + STRING_FLAG(logtail_profile_snapshot); -#else - return GetAgentLogDir() + "loongcollector_profile_snapshot"; -#endif + if (BOOL_FLAG(logtail_mode)) { + return GetProcessExecutionDir() + STRING_FLAG(logtail_profile_snapshot); + } else { + return GetAgentLogDir() + "loongcollector_profile_snapshot"; + } } string GetObserverEbpfHostPath() { -#if defined(__RUN_LOGTAIL__) + if (BOOL_FLAG(logtail_mode)) { return STRING_FLAG(sls_observer_ebpf_host_path); -#else + } else { return GetAgentDataDir(); -#endif + } } -string GetSendBufferFileNamePrefix(){ -#if defined(__RUN_LOGTAIL__) +string GetSendBufferFileNamePrefix() { + if (BOOL_FLAG(logtail_mode)) { return "logtail_buffer_file_"; -#else + } else { return "send_buffer_file_"; -#endif + } } string GetLegacyUserLocalConfigFilePath() { -#if defined(__RUN_LOGTAIL__) - return AppConfig::GetInstance()->GetProcessExecutionDir(); -#else - return AppConfig::GetInstance()->GetLoongcollectorConfDir(); -#endif + if (BOOL_FLAG(logtail_mode)) { + return AppConfig::GetInstance()->GetProcessExecutionDir(); + } else { + return AppConfig::GetInstance()->GetLoongcollectorConfDir(); + } } string GetExactlyOnceCheckpoint() { -#if defined(__RUN_LOGTAIL__) - auto fp = boost::filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()); - return (fp / "checkpoint_v2").string(); -#else - auto fp = boost::filesystem::path(GetAgentDataDir()); - return (fp / "exactly_once_checkpoint").string(); -#endif + if (BOOL_FLAG(logtail_mode)) { + auto fp = boost::filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()); + return (fp / "checkpoint_v2").string(); + } else { + auto fp = boost::filesystem::path(GetAgentDataDir()); + return (fp / "exactly_once_checkpoint").string(); + } +} + +string GetFileTagsDir() { + if (BOOL_FLAG(logtail_mode)) { + return STRING_FLAG(ALIYUN_LOG_FILE_TAGS); + } else { + return AbsolutePath(STRING_FLAG(ALIYUN_LOG_FILE_TAGS), AppConfig::GetInstance()->GetLoongcollectorConfDir()); + } } AppConfig::AppConfig() { LOG_INFO(sLogger, ("AppConfig AppConfig", "success")); - mSendRandomSleep = BOOL_FLAG(enable_send_tps_smoothing); - mSendFlowControl = BOOL_FLAG(enable_flow_control); SetIlogtailConfigJson(""); // mStreamLogAddress = "0.0.0.0"; // mIsOldPubRegion = false; @@ -500,6 +537,7 @@ void AppConfig::MergeJson(Json::Value& mainConfJson, const Json::Value& subConfJ } } +// 只有 logtail 模式才使用 void AppConfig::LoadIncludeConfig(Json::Value& confJson) { // New default value of the flag is renamed from /etc/ilogtail/config.d/ // to config.d, be compatible with old default value. @@ -549,8 +587,46 @@ void AppConfig::LoadIncludeConfig(Json::Value& confJson) { } } +void AppConfig::LoadLocalInstanceConfig() { + filesystem::path localConfigPath + = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) / "instance_config" / "local"; + error_code ec; + filesystem::create_directories(localConfigPath, ec); + if (ec) { + LOG_WARNING(sLogger, + ("failed to create dir for local instanceconfig", + "manual creation may be required")("error code", ec.value())("error msg", ec.message())); + } + InstanceConfigWatcher::GetInstance()->AddSource(localConfigPath.string()); + InstanceConfigDiff instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); + if (!instanceConfigDiff.IsEmpty()) { + InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff); + } +} + void AppConfig::LoadAppConfig(const std::string& ilogtailConfigFile) { mDockerFilePathConfig = GetAgentDockerPathConfig(); + if (BOOL_FLAG(logtail_mode)) { + loadAppConfigLogtailMode(ilogtailConfigFile); + } else { + std::string confDir = GetAgentConfDir(); + SetLoongcollectorConfDir(AbsolutePath(confDir, mProcessExecutionDir)); + } + // 加载本地instanceconfig + LoadLocalInstanceConfig(); + + ParseJsonToFlags(mLocalInstanceConfig); + ParseEnvToFlags(); + + LoadResourceConf(mLocalInstanceConfig); + // load addr will init sender, sender param depend on LoadResourceConf + // LoadAddrConfig(mLocalInstanceConfig); + LoadOtherConf(mLocalInstanceConfig); + + CheckAndResetProxyEnv(); +} + +void AppConfig::loadAppConfigLogtailMode(const std::string& ilogtailConfigFile) { Json::Value confJson(Json::objectValue); std::string newConfDir; @@ -593,18 +669,15 @@ void AppConfig::LoadAppConfig(const std::string& ilogtailConfigFile) { LOG_INFO(sLogger, ("load logtail config file, path", ilogtailConfigFile)); LOG_INFO(sLogger, ("load logtail config file, detail", configJsonString)); - ParseJsonToFlags(confJson); - ParseEnvToFlags(); - - LoadResourceConf(confJson); - // load addr will init sender, sender param depend on LoadResourceConf - // LoadAddrConfig(confJson); - LoadOtherConf(confJson); - - CheckAndResetProxyEnv(); - mConfJson = confJson; + mLocalInstanceConfig = confJson; } +/** + * @brief 从环境变量加载Tag + * + * 该函数从环境变量中加载预定义的Tag。 + * Tag键从环境变量中获取,对应的值也从环境变量中读取。 + */ void AppConfig::LoadEnvTags() { char* envTagKeys = getenv(STRING_FLAG(default_env_tag_keys).c_str()); if (envTagKeys == NULL) { @@ -626,7 +699,15 @@ void AppConfig::LoadEnvTags() { } } -// @return true if input configValue has been updated. +/** + * @brief 从环境变量加载单个配置值 + * + * @tparam T 配置值的类型 + * @param envKey 环境变量的键 + * @param configValue 配置值的引用,如果环境变量存在且有效,将被更新 + * @param minValue 配置值的最小允许值 + * @return 如果配置值被更新,返回true;否则返回false + */ template bool LoadSingleValueEnvConfig(const char* envKey, T& configValue, const T minValue) { try { @@ -646,6 +727,13 @@ bool LoadSingleValueEnvConfig(const char* envKey, T& configValue, const T minVal return false; } +/** + * @brief 从环境变量加载配置值(如果存在) + * + * @tparam T 配置值的类型 + * @param envKey 环境变量的键 + * @param cfgValue 配置值的引用,如果环境变量存在,将被更新 + */ template void LoadEnvValueIfExisting(const char* envKey, T& cfgValue) { try { @@ -668,6 +756,18 @@ void AppConfig::LoadEnvResourceLimit() { LoadSingleValueEnvConfig("send_request_concurrency", mSendRequestConcurrency, (int32_t)2); } +/** + * @brief 检查是否处于纯容器模式 + * + * 该函数检查系统是否运行在纯容器模式下。 + * + * 主要步骤: + * 1. 在企业版中,检查是否设置了用户定义的ID环境变量 + * 2. 检查默认容器主机路径是否存在 + * 3. 根据检查结果设置mPurageContainerMode标志 + * + * @note 该函数会更新mPurageContainerMode成员变量 + */ void AppConfig::CheckPurageContainerMode() { #ifdef __ENTERPRISE__ if (getenv(STRING_FLAG(ilogtail_user_defined_id_env_name).c_str()) == NULL) { @@ -698,7 +798,7 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) { mMaxBytePerSec = INT32_FLAG(pub_max_send_byte_per_sec); #endif else - mMaxBytePerSec = INT32_FLAG(default_max_send_byte_per_sec); + mMaxBytePerSec = kDefaultMaxSendBytePerSec; if (confJson.isMember("bytes_per_sec") && confJson["bytes_per_sec"].isInt()) mBytePerSec = confJson["bytes_per_sec"].asInt(); @@ -1255,7 +1355,7 @@ void AppConfig::InitEnvMapping(const std::string& envStr, std::map sIgnoreFlagSet - = {"loongcollector_conf_dir", "loongcollector_log_dir", "loongcollector_data_dir", "loongcollector_run_dir"}; + = {"loongcollector_conf_dir", "loongcollector_log_dir", "loongcollector_data_dir", "loongcollector_run_dir", "logtail_mode"}; if (sIgnoreFlagSet.find(flagName) != sIgnoreFlagSet.end()) { return; } @@ -1302,10 +1402,43 @@ void AppConfig::ParseEnvToFlags() { } } #endif - for (auto iter = envMapping.begin(); iter != envMapping.end(); ++iter) { - const std::string& key = iter->first; - const std::string& value = iter->second; + for (const auto & iter : envMapping) { + const std::string& key = iter.first; + const std::string& value = iter.second; SetConfigFlag(key, value); + // 尝试解析为 double + char* end; + double doubleValue = strtod(value.c_str(), &end); + mEnvConfigKeyToConfigName[key] = "env"; + if (*end == '\0') { + mEnvConfig[key] = doubleValue; + continue; + } + + // 尝试解析为 int64_t + int64_t int64Value = strtoll(value.c_str(), &end, 10); + if (*end == '\0' && errno != ERANGE) { + mEnvConfig[key] = Json::Int64(int64Value); + continue; + } + + // 尝试解析为 int32_t + auto int32Value = static_cast(strtol(value.c_str(), &end, 10)); + if (*end == '\0' && errno != ERANGE && static_cast(int32Value) == int64Value) { + mEnvConfig[key] = int32Value; + continue; + } + + // 检查是否为 bool + if (value == "true") { + mEnvConfig[key] = true; + continue; + } + if (value == "false") { + mEnvConfig[key] = false; + continue; + } + mEnvConfig[key] = value; } } @@ -1328,6 +1461,23 @@ void AppConfig::ReadFlagsFromMap(const std::unordered_map sIgnoreKeySet = {"data_server_list"}; const static unordered_set sForceKeySet = {"config_server_address_list"}; @@ -1444,16 +1594,8 @@ void AppConfig::CheckAndAdjustParameters() { INT32_FLAG(max_reader_open_files))); LOG_INFO(sLogger, - ("send byte per second limit", mMaxBytePerSec)("batch send interval", INT32_FLAG(batch_send_interval))( - "batch send size", INT32_FLAG(batch_send_metric_size))); - // when inflow exceed 30MB/s, FlowControl lose precision - if (mMaxBytePerSec >= 30 * 1024 * 1024) { - if (mSendFlowControl) - mSendFlowControl = false; - if (mSendRandomSleep) - mSendRandomSleep = false; - LOG_INFO(sLogger, ("send flow control", "disable")("send random sleep", "disable")); - } + ("batch send interval", INT32_FLAG(batch_send_interval))("batch send size", + INT32_FLAG(batch_send_metric_size))); } bool AppConfig::IsInInotifyBlackList(const std::string& path) const { @@ -1532,7 +1674,7 @@ void AppConfig::UpdateFileTags() { } // read local config Json::Value localFileTagsJson; - const char* file_tags_dir = STRING_FLAG(ALIYUN_LOG_FILE_TAGS).c_str(); + string file_tags_dir = GetFileTagsDir(); ParseConfResult userLogRes = ParseConfig(file_tags_dir, localFileTagsJson); if (userLogRes != CONFIG_OK) { if (userLogRes == CONFIG_NOT_EXIST) @@ -1562,4 +1704,146 @@ void AppConfig::UpdateFileTags() { return; } +void AppConfig::MergeJson(Json::Value& mainConfJson, const Json::Value& subConfJson, std::unordered_map& keyToConfigName, const std::string& configName) { + for (const auto& subkey : subConfJson.getMemberNames()) { + mainConfJson[subkey] = subConfJson[subkey]; + keyToConfigName[subkey] = configName; + } +} + +void AppConfig::LoadInstanceConfig(const std::map>& instanceConfig) { + Json::Value remoteInstanceConfig; + Json::Value localInstanceConfig; + mLocalInstanceConfigKeyToConfigName.clear(); + mRemoteInstanceConfigKeyToConfigName.clear(); + for (const auto& config : instanceConfig) { + if (EndWith(config.second->mDirName, AppConfig::sLocalConfigDir)) { + MergeJson(localInstanceConfig, config.second->GetConfig(), mLocalInstanceConfigKeyToConfigName, config.second->mDirName+"/"+config.second->mConfigName); + } else { + MergeJson(remoteInstanceConfig, config.second->GetConfig(), mRemoteInstanceConfigKeyToConfigName, config.second->mDirName+"/"+config.second->mConfigName); + } + } + if (localInstanceConfig != mLocalInstanceConfig || mRemoteInstanceConfig != remoteInstanceConfig) { + LOG_INFO(sLogger, + ("load all local instanceConfig", localInstanceConfig.toStyledString())( + "load all remote instanceConfig", remoteInstanceConfig.toStyledString())); + std::set*> callbackCall; + for (const auto& callback : mCallbacks) { + const std::string& key = callback.first; + bool configChanged = false; + // 检查本地配置是否发生变化 + if (localInstanceConfig.isMember(key) != mLocalInstanceConfig.isMember(key) + || (localInstanceConfig.isMember(key) && localInstanceConfig[key] != mLocalInstanceConfig[key])) { + configChanged = true; + } + // 检查远程配置是否发生变化 + if (!configChanged + && (remoteInstanceConfig.isMember(key) != mRemoteInstanceConfig.isMember(key) + || (remoteInstanceConfig.isMember(key) + && remoteInstanceConfig[key] != mRemoteInstanceConfig[key]))) { + configChanged = true; + } + if (configChanged) { + callbackCall.insert(callback.second); + } + } + mLocalInstanceConfig = std::move(localInstanceConfig); + mRemoteInstanceConfig = std::move(remoteInstanceConfig); + for (const auto& callback : callbackCall) { + (*callback)(); + } + } +} + +void AppConfig::RegisterCallback(const std::string& key, std::function* callback) { + mCallbacks[key] = callback; +} + +template +T AppConfig::MergeConfig(const T& defaultValue, + const T& currentValue, + const std::string& name, + const std::function& validateFn) { + const auto& localInstanceConfig = AppConfig::GetInstance()->GetLocalInstanceConfig(); + const auto& envConfig = AppConfig::GetInstance()->GetEnvConfig(); + const auto& remoteInstanceConfig = AppConfig::GetInstance()->GetRemoteInstanceConfig(); + + T res = defaultValue; + std::string configName = "default"; + + auto tryMerge = [&](const Json::Value& config, std::unordered_map& keyToConfigName) { + if (config.isMember(name)) { + if constexpr (std::is_same_v) { + if (config[name].isInt() && validateFn(name, config[name].asInt())) { + res = config[name].asInt(); + configName = keyToConfigName[name]; + } + } else if constexpr (std::is_same_v) { + if (config[name].isInt64() && validateFn(name, config[name].asInt64())) { + res = config[name].asInt64(); + configName = keyToConfigName[name]; + } + } else if constexpr (std::is_same_v) { + if (config[name].isBool() && validateFn(name, config[name].asBool())) { + res = config[name].asBool(); + configName = keyToConfigName[name]; + } + } else if constexpr (std::is_same_v) { + if (config[name].isString() && validateFn(name, config[name].asString())) { + res = config[name].asString(); + configName = keyToConfigName[name]; + } + } else if constexpr (std::is_same_v) { + if (config[name].isDouble() && validateFn(name, config[name].asDouble())) { + res = config[name].asDouble(); + configName = keyToConfigName[name]; + } + } + } + }; + + tryMerge(localInstanceConfig, mLocalInstanceConfigKeyToConfigName); + tryMerge(envConfig, mEnvConfigKeyToConfigName); + tryMerge(remoteInstanceConfig, mRemoteInstanceConfigKeyToConfigName); + LOG_INFO( + sLogger, + ("merge instance config", name)("key", name)("newValue", res)("lastValue", currentValue)("from", configName)); + return res; +} + +int32_t AppConfig::MergeInt32(int32_t defaultValue, + int32_t currentValue, + const std::string& name, + const std::function& validateFn) { + return MergeConfig(defaultValue, currentValue, name, validateFn); +} + +int64_t AppConfig::MergeInt64(int64_t defaultValue, + int64_t currentValue, + const std::string& name, + const std::function& validateFn) { + return MergeConfig(defaultValue, currentValue, name, validateFn); +} + +bool AppConfig::MergeBool(bool defaultValue, + bool currentValue, + const std::string& name, + const std::function& validateFn) { + return MergeConfig(defaultValue, currentValue, name, validateFn); +} + +std::string AppConfig::MergeString(const std::string& defaultValue, + const std::string& currentValue, + const std::string& name, + const std::function& validateFn) { + return MergeConfig(defaultValue, currentValue, name, validateFn); +} + +double AppConfig::MergeDouble(double defaultValue, + double currentValue, + const std::string& name, + const std::function& validateFn) { + return MergeConfig(defaultValue, currentValue, name, validateFn); +} + } // namespace logtail diff --git a/core/app_config/AppConfig.h b/core/app_config/AppConfig.h index b827d5a6a4..bf871067e9 100644 --- a/core/app_config/AppConfig.h +++ b/core/app_config/AppConfig.h @@ -20,13 +20,17 @@ #include #include +#include #include #include +#include "InstanceConfig.h" #include "common/Lock.h" #include "protobuf/sls/sls_logs.pb.h" namespace logtail { +extern const int32_t kDefaultMaxSendBytePerSec; + void CreateAgentDir(); std::string GetAgentLogDir(); @@ -71,7 +75,24 @@ class DoubleBuffer { class AppConfig { private: - Json::Value mConfJson; + static std::string sLocalConfigDir; + void loadAppConfigLogtailMode(const std::string& ilogtailConfigFile); + Json::Value mergeAllConfigs(); + + Json::Value mLocalInstanceConfig; + Json::Value mEnvConfig; + Json::Value mRemoteInstanceConfig; + std::unordered_map mLocalInstanceConfigKeyToConfigName; + std::unordered_map mEnvConfigKeyToConfigName; + std::unordered_map mRemoteInstanceConfigKeyToConfigName; + + std::map*> mCallbacks; + + DoubleBuffer> mFileTags; + DoubleBuffer> mAgentAttrs; + + Json::Value mFileTagsJson; + mutable SpinLock mAppConfigLock; // loongcollector_config.json content for rebuild @@ -113,8 +134,6 @@ class AppConfig { // local config // std::string mMappingConfigPath; - bool mSendRandomSleep; - bool mSendFlowControl; int32_t mMaxMultiConfigSize; bool mAcceptMultiConfigFlag; @@ -181,9 +200,6 @@ class AppConfig { std::set mDynamicPlugins; std::vector mHostPathBlacklist; - Json::Value mFileTagsJson; - DoubleBuffer> mFileTags; - std::string mBindInterface; // /** @@ -208,6 +224,7 @@ class AppConfig { */ void CheckAndAdjustParameters(); void MergeJson(Json::Value& mainConfJson, const Json::Value& subConfJson); + void MergeJson(Json::Value& mainConfJson, const Json::Value& subConfJson, std::unordered_map& keyToConfigName, const std::string& configName); /** * @brief Load *.json from config.d dir * @@ -275,29 +292,90 @@ class AppConfig { bool CheckAndResetProxyAddress(const char* envKey, std::string& address); static void InitEnvMapping(const std::string& envStr, std::map& envMapping); + static void InitEnvMapping(const std::string& envStr, Json::Value& envJson); static void SetConfigFlag(const std::string& flagName, const std::string& value); public: AppConfig(); ~AppConfig(){}; + void LoadInstanceConfig(const std::map>&); + static AppConfig* GetInstance() { static AppConfig singleton; return &singleton; } + // 初始化配置 void LoadAppConfig(const std::string& ilogtailConfigFile); + void LoadLocalInstanceConfig(); + + // 获取全局参数方法 + const Json::Value& GetLocalInstanceConfig() { return mLocalInstanceConfig; }; + const Json::Value& GetEnvConfig() { return mEnvConfig; }; + const Json::Value& GetRemoteInstanceConfig() { return mRemoteInstanceConfig; }; + + template + T MergeConfig(const T& defaultValue, + const T& currentValue, + const std::string& name, + const std::function& validateFn); + int32_t MergeInt32(int32_t defaultValue, + int32_t currentValue, + const std::string& name, + const std::function& validateFn); + + int64_t MergeInt64(int64_t defaultValue, + int64_t currentValue, + const std::string& name, + const std::function& validateFn); + bool MergeBool(bool defaultValue, + bool currentValue, + const std::string& name, + const std::function& validateFn); + std::string MergeString(const std::string& defaultValue, + const std::string& currentValue, + const std::string& name, + const std::function& validateFn); + double MergeDouble(double defaultValue, + double currentValue, + const std::string& name, + const std::function& validateFn); + + + // 注册回调 + void RegisterCallback(const std::string& key, std::function* callback); + + // 合并配置 + std::string Merge(Json::Value& localConf, + Json::Value& envConfig, + Json::Value& remoteConf, + std::string& name, + std::function validateFn); + + // 获取特定配置 + // CPU限制参数等仅与框架相关的参数,计算逻辑可以放在AppConfig + float GetMachineCpuUsageThreshold() const { return mMachineCpuUsageThreshold; } + float GetScaledCpuUsageUpLimit() const { return mScaledCpuUsageUpLimit; } + float GetCpuUsageUpLimit() const { return mCpuUsageUpLimit; } + + // 文件标签相关,获取从文件中来的tags + std::vector& GetFileTags() { return mFileTags.getReadBuffer(); } + // 更新从文件中来的tags + void UpdateFileTags(); + // Agent属性相关,获取从文件中来的attrs + std::map& GetAgentAttrs() { return mAgentAttrs.getReadBuffer(); } + // 更新从文件中来的attrs + void UpdateAgentAttrs(); + + // Legacy:获取各种参数 bool NoInotify() const { return mNoInotify; } bool IsInInotifyBlackList(const std::string& path) const; bool IsLogParseAlarmValid() const { return mLogParseAlarmFlag; } - bool IsSendRandomSleep() const { return mSendRandomSleep; } - - bool IsSendFlowControl() const { return mSendFlowControl; } - // std::string GetDefaultRegion() const; // void SetDefaultRegion(const std::string& region); @@ -331,12 +409,6 @@ class AppConfig { bool IsResourceAutoScale() const { return mResourceAutoScale; } - float GetMachineCpuUsageThreshold() const { return mMachineCpuUsageThreshold; } - - float GetScaledCpuUsageUpLimit() const { return mScaledCpuUsageUpLimit; } - - float GetCpuUsageUpLimit() const { return mCpuUsageUpLimit; } - int64_t GetMemUsageUpLimit() const { return mMemUsageUpLimit; } int32_t GetMaxHoldedDataSize() const { return mMaxHoldedDataSize; } @@ -345,6 +417,8 @@ class AppConfig { int32_t GetMaxBytePerSec() const { return mMaxBytePerSec; } + void SetMaxBytePerSec(int32_t maxBytePerSec) { mMaxBytePerSec = maxBytePerSec; } + int32_t GetBytePerSec() const { return mBytePerSec; } int32_t GetNumOfBufferFile() const { return mNumOfBufferFile; } @@ -421,14 +495,10 @@ class AppConfig { inline const std::set& GetDynamicPlugins() const { return mDynamicPlugins; } bool IsHostPathMatchBlacklist(const std::string& dirPath) const; - const Json::Value& GetConfig() const { return mConfJson; } + const Json::Value& GetConfig() const { return mLocalInstanceConfig; } const std::string& GetBindInterface() const { return mBindInterface; } - std::vector& GetFileTags() { return mFileTags.getReadBuffer(); } - - void UpdateFileTags(); - #ifdef APSARA_UNIT_TEST_MAIN friend class SenderUnittest; friend class ConfigUpdatorUnittest; diff --git a/core/application/Application.cpp b/core/application/Application.cpp index a8f081504c..408c850fa4 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -32,16 +32,17 @@ #include "common/version.h" #include "config/ConfigDiff.h" #include "config/watcher/ConfigWatcher.h" +#include "config/watcher/InstanceConfigWatcher.h" #include "file_server/ConfigManager.h" #include "file_server/EventDispatcher.h" #include "file_server/FileServer.h" #include "file_server/event_handler/LogInput.h" #include "go_pipeline/LogtailPlugin.h" +#include "instance_config/InstanceConfigManager.h" #include "logger/Logger.h" #include "monitor/LogFileProfiler.h" #include "monitor/MetricExportor.h" #include "monitor/Monitor.h" -#include "pipeline/InstanceConfigManager.h" #include "pipeline/PipelineManager.h" #include "pipeline/plugin/PluginRegistry.h" #include "pipeline/queue/ExactlyOnceQueueManager.h" @@ -212,20 +213,7 @@ void Application::Start() { // GCOVR_EXCL_START ("failed to create dir for local pipeline_config", "manual creation may be required")("error code", ec.value())("error msg", ec.message())); } - ConfigWatcher::GetInstance()->AddPipelineSource(localConfigPath.string()); - } - { - // add local config dir - filesystem::path localConfigPath - = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) / "instance_config" / "local"; - error_code ec; - filesystem::create_directories(localConfigPath, ec); - if (ec) { - LOG_WARNING(sLogger, - ("failed to create dir for local instance_config", - "manual creation may be required")("error code", ec.value())("error msg", ec.message())); - } - ConfigWatcher::GetInstance()->AddInstanceSource(localConfigPath.string()); + ConfigWatcher::GetInstance()->AddSource(localConfigPath.string()); } #ifdef __ENTERPRISE__ @@ -282,11 +270,11 @@ void Application::Start() { // GCOVR_EXCL_START lastCheckTagsTime = curTime; } if (curTime - lastConfigCheckTime >= INT32_FLAG(config_scan_interval)) { - PipelineConfigDiff pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckConfigDiff(); if (!pipelineConfigDiff.IsEmpty()) { PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff); } - InstanceConfigDiff instanceConfigDiff = ConfigWatcher::GetInstance()->CheckInstanceConfigDiff(); + InstanceConfigDiff instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); if (!instanceConfigDiff.IsEmpty()) { InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff); } diff --git a/core/common/RuntimeUtil.cpp b/core/common/RuntimeUtil.cpp index 91ea4632a9..8cf2b37cc9 100644 --- a/core/common/RuntimeUtil.cpp +++ b/core/common/RuntimeUtil.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "Flags.h" #include "app_config/AppConfig.h" #include "RuntimeUtil.h" #if defined(__linux__) @@ -31,6 +32,7 @@ #include "FileSystemUtil.h" DECLARE_FLAG_STRING(logtail_sys_conf_dir); +DECLARE_FLAG_BOOL(logtail_mode); namespace logtail { @@ -85,31 +87,31 @@ bool RebuildExecutionDir(const std::string& ilogtailConfigJson, errorMessage = ss.str(); return false; } - #if defined(__RUN_LOGTAIL__) - path = executionDir.empty() ? GetProcessExecutionDir() : executionDir; - if (CheckExistance(path)) - return true; - if (!Mkdir(path)) { - std::stringstream ss; - ss << "create execution dir failed, errno is " << errno; - errorMessage = ss.str(); - return false; - } + if (BOOL_FLAG(logtail_mode)) { + path = executionDir.empty() ? GetProcessExecutionDir() : executionDir; + if (CheckExistance(path)) + return true; + if (!Mkdir(path)) { + std::stringstream ss; + ss << "create execution dir failed, errno is " << errno; + errorMessage = ss.str(); + return false; + } - if (ilogtailConfigJson.empty()) - return true; + if (ilogtailConfigJson.empty()) + return true; - FILE* pFile = fopen((path + STRING_FLAG(ilogtail_config)).c_str(), "w"); - if (pFile == NULL) { - std::stringstream ss; - ss << "open " << STRING_FLAG(ilogtail_config) << " to write failed, errno is " << errno; - errorMessage = ss.str(); - return false; - } + FILE* pFile = fopen((path + STRING_FLAG(ilogtail_config)).c_str(), "w"); + if (pFile == NULL) { + std::stringstream ss; + ss << "open " << STRING_FLAG(ilogtail_config) << " to write failed, errno is " << errno; + errorMessage = ss.str(); + return false; + } - fwrite(ilogtailConfigJson.c_str(), 1, ilogtailConfigJson.size(), pFile); - fclose(pFile); - #endif + fwrite(ilogtailConfigJson.c_str(), 1, ilogtailConfigJson.size(), pFile); + fclose(pFile); + } return true; } diff --git a/core/config/InstanceConfig.h b/core/config/InstanceConfig.h index 82ac47533b..76842411c3 100644 --- a/core/config/InstanceConfig.h +++ b/core/config/InstanceConfig.h @@ -28,61 +28,14 @@ namespace logtail { struct InstanceConfig { - std::string mName; - std::unique_ptr mDetail; + std::string mConfigName; + std::string mDirName; + Json::Value mDetail; - // for alarm only - std::string mProject; - std::string mLogstore; - std::string mRegion; + InstanceConfig(const std::string& name, const Json::Value& detail, const std::string& dirName) + : mConfigName(name), mDirName(dirName), mDetail(detail) {} - InstanceConfig(const std::string& name, std::unique_ptr&& detail) - : mName(name), mDetail(std::move(detail)) { - mProject = ""; - mLogstore = ""; - mRegion = ""; - } - InstanceConfig(const logtail::InstanceConfig& config) { - mName = config.mName; - mDetail = std::make_unique(*config.mDetail); - mProject = ""; - mLogstore = ""; - mRegion = ""; - } - - InstanceConfig& operator=(InstanceConfig&& other) { - if (this != &other) { - mName = std::move(other.mName); - mDetail = std::move(other.mDetail); - mProject = ""; - mLogstore = ""; - mRegion = ""; - } - return *this; - } - - InstanceConfig& operator=(const InstanceConfig& other) { - if (this != &other) { - mName = other.mName; - mDetail = std::make_unique(*other.mDetail); - mProject = ""; - mLogstore = ""; - mRegion = ""; - } - return *this; - } - - bool Parse() { return true; } - - const Json::Value& GetConfig() const { return *mDetail; } + const Json::Value& GetConfig() const { return mDetail; } }; -inline bool operator==(const InstanceConfig& lhs, const InstanceConfig& rhs) { - return (lhs.mName == rhs.mName) && (*lhs.mDetail == *rhs.mDetail); -} - -inline bool operator!=(const InstanceConfig& lhs, const InstanceConfig& rhs) { - return !(lhs == rhs); -} - } // namespace logtail diff --git a/core/config/common_provider/CommonConfigProvider.cpp b/core/config/common_provider/CommonConfigProvider.cpp index c96d5e8399..cc62335208 100644 --- a/core/config/common_provider/CommonConfigProvider.cpp +++ b/core/config/common_provider/CommonConfigProvider.cpp @@ -91,7 +91,7 @@ void CommonConfigProvider::Init(const string& dir) { if (port < 1 || port > 65535) { LOG_WARNING(sLogger, ("configserver_address", "illegal port")("port", port)); continue; - } + } mConfigServerAddresses.push_back(ConfigServerAddress(host, port)); } @@ -130,6 +130,8 @@ void CommonConfigProvider::Stop() { void CommonConfigProvider::LoadConfigFile() { error_code ec; + lock_guard pipelineInfomaplock(mPipelineInfoMapMux); + lock_guard lockPipeline(mPipelineMux); for (auto const& entry : filesystem::directory_iterator(mPipelineSourceDir, ec)) { Json::Value detail; if (LoadConfigDetailFromFile(entry, detail)) { @@ -141,10 +143,12 @@ void CommonConfigProvider::LoadConfigFile() { } info.status = ConfigFeedbackStatus::APPLYING; info.detail = detail.toStyledString(); - lock_guard infomaplock(mPipelineInfoMapMux); mPipelineConfigInfoMap[info.name] = info; + ConfigFeedbackReceiver::GetInstance().RegisterPipelineConfig(info.name, this); } } + lock_guard instanceInfomaplock(mInstanceInfoMapMux); + lock_guard lockInstance(mInstanceMux); for (auto const& entry : filesystem::directory_iterator(mInstanceSourceDir, ec)) { Json::Value detail; if (LoadConfigDetailFromFile(entry, detail)) { @@ -156,8 +160,8 @@ void CommonConfigProvider::LoadConfigFile() { } info.status = ConfigFeedbackStatus::APPLYING; info.detail = detail.toStyledString(); - lock_guard infomaplock(mInstanceInfoMapMux); mInstanceConfigInfoMap[info.name] = info; + ConfigFeedbackReceiver::GetInstance().RegisterInstanceConfig(info.name, this); } } } @@ -482,15 +486,15 @@ void CommonConfigProvider::UpdateRemoteInstanceConfig( } else { if (!DumpConfigFile(config, sourceDir)) { mInstanceConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), - .version = config.version(), - .status = ConfigFeedbackStatus::FAILED, - .detail = config.detail()}; + .version = config.version(), + .status = ConfigFeedbackStatus::FAILED, + .detail = config.detail()}; continue; } mInstanceConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), - .version = config.version(), - .status = ConfigFeedbackStatus::APPLYING, - .detail = config.detail()}; + .version = config.version(), + .status = ConfigFeedbackStatus::APPLYING, + .detail = config.detail()}; ConfigFeedbackReceiver::GetInstance().RegisterInstanceConfig(config.name(), this); } } @@ -514,7 +518,8 @@ bool CommonConfigProvider::FetchInstanceConfigFromServer( string reqBody; fetchConfigRequest.SerializeToString(&reqBody); string fetchConfigResponse; - if (SendHttpRequest(operation, reqBody, "FetchInstanceConfig", fetchConfigRequest.request_id(), fetchConfigResponse)) { + if (SendHttpRequest( + operation, reqBody, "FetchInstanceConfig", fetchConfigRequest.request_id(), fetchConfigResponse)) { configserver::proto::v2::FetchConfigResponse fetchConfigResponsePb; fetchConfigResponsePb.ParseFromString(fetchConfigResponse); res.Swap(fetchConfigResponsePb.mutable_config_details()); @@ -540,7 +545,8 @@ bool CommonConfigProvider::FetchPipelineConfigFromServer( string reqBody; fetchConfigRequest.SerializeToString(&reqBody); string fetchConfigResponse; - if (SendHttpRequest(operation, reqBody, "FetchPipelineConfig", fetchConfigRequest.request_id(), fetchConfigResponse)) { + if (SendHttpRequest( + operation, reqBody, "FetchPipelineConfig", fetchConfigRequest.request_id(), fetchConfigResponse)) { configserver::proto::v2::FetchConfigResponse fetchConfigResponsePb; fetchConfigResponsePb.ParseFromString(fetchConfigResponse); res.Swap(fetchConfigResponsePb.mutable_config_details()); diff --git a/core/config/provider/ConfigProvider.cpp b/core/config/provider/ConfigProvider.cpp index be4709dc7a..101e209fa3 100644 --- a/core/config/provider/ConfigProvider.cpp +++ b/core/config/provider/ConfigProvider.cpp @@ -14,6 +14,7 @@ #include "config/provider/ConfigProvider.h" +#include "InstanceConfigWatcher.h" #include "app_config/AppConfig.h" #include "config/watcher/ConfigWatcher.h" @@ -33,11 +34,11 @@ void ConfigProvider::Init(const string& dir) { error_code ec; filesystem::create_directories(mPipelineSourceDir, ec); - ConfigWatcher::GetInstance()->AddPipelineSource(mPipelineSourceDir, &mPipelineMux); + ConfigWatcher::GetInstance()->AddSource(mPipelineSourceDir, &mPipelineMux); ec.clear(); filesystem::create_directories(mInstanceSourceDir, ec); - ConfigWatcher::GetInstance()->AddInstanceSource(mInstanceSourceDir, &mInstanceMux); + InstanceConfigWatcher::GetInstance()->AddSource(mInstanceSourceDir, &mInstanceMux); } } // namespace logtail diff --git a/core/config/watcher/ConfigWatcher.cpp b/core/config/watcher/ConfigWatcher.cpp index 67c1633ad5..baa3d28a9f 100644 --- a/core/config/watcher/ConfigWatcher.cpp +++ b/core/config/watcher/ConfigWatcher.cpp @@ -14,12 +14,11 @@ #include "config/watcher/ConfigWatcher.h" -#include #include #include +#include "PipelineConfig.h" #include "logger/Logger.h" -#include "pipeline/InstanceConfigManager.h" #include "pipeline/PipelineManager.h" using namespace std; @@ -28,45 +27,35 @@ namespace logtail { bool ReadFile(const string& filepath, string& content); -ConfigWatcher::ConfigWatcher() - : mPipelineManager(PipelineManager::GetInstance()), mInstanceConfigManager(InstanceConfigManager::GetInstance()) { +ConfigWatcher::ConfigWatcher() : mPipelineManager(PipelineManager::GetInstance()) { } -template -ConfigDiffType ConfigWatcher::CheckConfigDiff( - const std::vector& configDir, - const std::unordered_map& configDirMutexMap, - std::map>& fileInfoMap, - const ConfigManagerType* configManager, - const std::string& configType) { - ConfigDiffType diff; +PipelineConfigDiff ConfigWatcher::CheckConfigDiff() { + PipelineConfigDiff diff; unordered_set configSet; - for (const auto& dir : configDir) { + for (const auto& dir : mSourceDir) { error_code ec; filesystem::file_status s = filesystem::status(dir, ec); if (ec) { LOG_WARNING(sLogger, ("failed to get config dir path info", "skip current object")("dir path", dir.string())( - "error code", ec.value())("error msg", ec.message())("configType", configType)); + "error code", ec.value())("error msg", ec.message())); continue; } if (!filesystem::exists(s)) { - LOG_WARNING(sLogger, - ("config dir path not existed", "skip current object")("dir path", dir.string())("configType", - configType)); + LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string())); continue; } if (!filesystem::is_directory(s)) { LOG_WARNING(sLogger, - ("config dir path is not a directory", - "skip current object")("dir path", dir.string())("configType", configType)); + ("config dir path is not a directory", "skip current object")("dir path", dir.string())); continue; } for (auto const& entry : filesystem::directory_iterator(dir, ec)) { // lock the dir if it is provided by config provider unique_lock lock; - auto itr = configDirMutexMap.find(dir.string()); - if (itr != configDirMutexMap.end()) { + auto itr = mDirMutexMap.find(dir.string()); + if (itr != mDirMutexMap.end()) { lock = unique_lock(*itr->second, defer_lock); lock.lock(); } @@ -75,83 +64,76 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff( const string& configName = path.stem().string(); const string& filepath = path.string(); if (!filesystem::is_regular_file(entry.status(ec))) { - LOG_DEBUG(sLogger, - ("config file is not a regular file", - "skip current object")("filepath", filepath)("configType", configType)); + LOG_DEBUG(sLogger, ("config file is not a regular file", "skip current object")("filepath", filepath)); continue; } if (configSet.find(configName) != configSet.end()) { - LOG_WARNING(sLogger, - ("more than 1 config with the same name is found", - "skip current config")("filepath", filepath)("configType", configType)); + LOG_WARNING( + sLogger, + ("more than 1 config with the same name is found", "skip current config")("filepath", filepath)); continue; } configSet.insert(configName); - auto iter = fileInfoMap.find(filepath); + auto iter = mFileInfoMap.find(filepath); uintmax_t size = filesystem::file_size(path, ec); filesystem::file_time_type mTime = filesystem::last_write_time(path, ec); - if (iter == fileInfoMap.end()) { - fileInfoMap[filepath] = make_pair(size, mTime); - unique_ptr detail = make_unique(); + if (iter == mFileInfoMap.end()) { + mFileInfoMap[filepath] = make_pair(size, mTime); + unique_ptr detail = make_unique(new Json::Value()); if (!LoadConfigDetailFromFile(path, *detail)) { continue; } if (!IsConfigEnabled(configName, *detail)) { - LOG_INFO(sLogger, - ("new config found and disabled", - "skip current object")("config", configName)("configType", configType)); + LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName)); continue; } - ConfigType config(configName, std::move(detail)); + PipelineConfig config(configName, std::move(detail)); if (!config.Parse()) { - LOG_ERROR(sLogger, - ("new config found but invalid", - "skip current object")("config", configName)("configType", configType)); + LOG_ERROR(sLogger, ("new config found but invalid", "skip current object")("config", configName)); LogtailAlarm::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM, "new config found but invalid: skip current object, config: " - + configName + ", configType: " + configType, + + configName, config.mProject, config.mLogstore, config.mRegion); continue; } diff.mAdded.push_back(std::move(config)); - LOG_INFO(sLogger, - ("new config found and passed topology check", - "prepare to build config")("config", configName)("configType", configType)); + LOG_INFO( + sLogger, + ("new config found and passed topology check", "prepare to build pipeline")("config", configName)); } else if (iter->second.first != size || iter->second.second != mTime) { // for config currently running, we leave it untouched if new config is invalid - fileInfoMap[filepath] = make_pair(size, mTime); - unique_ptr detail = make_unique(); + mFileInfoMap[filepath] = make_pair(size, mTime); + unique_ptr detail = make_unique(new Json::Value()); if (!LoadConfigDetailFromFile(path, *detail)) { continue; } if (!IsConfigEnabled(configName, *detail)) { - if (configManager->FindConfigByName(configName)) { + if (mPipelineManager->FindConfigByName(configName)) { diff.mRemoved.push_back(configName); - LOG_INFO( - sLogger, - ("existing valid config modified and disabled", - "prepare to stop current running config")("config", configName)("configType", configType)); + LOG_INFO(sLogger, + ("existing valid config modified and disabled", + "prepare to stop current running pipeline")("config", configName)); } else { LOG_INFO(sLogger, - ("existing invalid config modified and disabled", - "skip current object")("config", configName)("configType", configType)); + ("existing invalid config modified and disabled", "skip current object")("config", + configName)); } continue; } - shared_ptr p = configManager->FindConfigByName(configName); + shared_ptr p = mPipelineManager->FindConfigByName(configName); if (!p) { - ConfigType config(configName, std::move(detail)); + PipelineConfig config(configName, std::move(detail)); if (!config.Parse()) { LOG_ERROR(sLogger, ("existing invalid config modified and remains invalid", - "skip current object")("config", configName)("configType", configType)); + "skip current object")("config", configName)); LogtailAlarm::GetInstance()->SendAlarm( CATEGORY_CONFIG_ALARM, "existing invalid config modified and remains invalid: skip current object, config: " - + configName + ", configType: " + configType, + + configName, config.mProject, config.mLogstore, config.mRegion); @@ -160,17 +142,17 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff( diff.mAdded.push_back(std::move(config)); LOG_INFO(sLogger, ("existing invalid config modified and passed topology check", - "prepare to build config")("config", configName)("configType", configType)); + "prepare to build pipeline")("config", configName)); } else if (*detail != p->GetConfig()) { - ConfigType config(configName, std::move(detail)); + PipelineConfig config(configName, std::move(detail)); if (!config.Parse()) { LOG_ERROR(sLogger, ("existing valid config modified and becomes invalid", - "keep current config running")("config", configName)("configType", configType)); + "keep current pipeline running")("config", configName)); LogtailAlarm::GetInstance()->SendAlarm( CATEGORY_CONFIG_ALARM, "existing valid config modified and becomes invalid: skip current object, config: " - + configName + ", configType: " + configType, + + configName, config.mProject, config.mLogstore, config.mRegion); @@ -179,84 +161,51 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff( diff.mModified.push_back(std::move(config)); LOG_INFO(sLogger, ("existing valid config modified and passed topology check", - "prepare to rebuild config")("config", configName)("configType", configType)); + "prepare to rebuild pipeline")("config", configName)); } else { LOG_DEBUG(sLogger, - ("existing valid config file modified, but no change found", - "skip current object")("configType", configType)); + ("existing valid config file modified, but no change found", "skip current object")); } } else { - LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object")("configType", configType)); + LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object")); } } } - for (const auto& name : configManager->GetAllConfigNames()) { + for (const auto& name : mPipelineManager->GetAllConfigNames()) { if (configSet.find(name) == configSet.end()) { diff.mRemoved.push_back(name); LOG_INFO(sLogger, - ("existing valid config is removed", - "prepare to stop current running config")("config", name)("configType", configType)); + ("existing valid config is removed", "prepare to stop current running pipeline")("config", name)); } } - for (const auto& item : fileInfoMap) { + for (const auto& item : mFileInfoMap) { string configName = filesystem::path(item.first).stem().string(); if (configSet.find(configName) == configSet.end()) { - fileInfoMap.erase(item.first); + mFileInfoMap.erase(item.first); } } if (!diff.IsEmpty()) { LOG_INFO(sLogger, - ("config files scan done", "got updates, begin to update configs")("added", diff.mAdded.size())( - "modified", diff.mModified.size())("removed", diff.mRemoved.size())("configType", configType)); + ("config files scan done", "got updates, begin to update pipelines")("added", diff.mAdded.size())( + "modified", diff.mModified.size())("removed", diff.mRemoved.size())); } else { - LOG_DEBUG(sLogger, ("config files scan done", "no update")("configType", configType)); + LOG_DEBUG(sLogger, ("config files scan done", "no update")); } return diff; } -PipelineConfigDiff ConfigWatcher::CheckPipelineConfigDiff() { - const static std::string configType = "pipelineConfig"; - return CheckConfigDiff( - mPipelineConfigDir, mPipelineConfigDirMutexMap, mPipelineFileInfoMap, mPipelineManager, configType); -} - -InstanceConfigDiff ConfigWatcher::CheckInstanceConfigDiff() { - const static std::string configType = "instanceConfig"; - return CheckConfigDiff( - mInstanceConfigDir, mInstanceConfigDirMutexMap, mInstanceFileInfoMap, mInstanceConfigManager, configType); -} - -void ConfigWatcher::AddPipelineSource(const string& dir, mutex* mux) { - mPipelineConfigDir.emplace_back(dir); +void ConfigWatcher::AddSource(const string& dir, mutex* mux) { + mSourceDir.emplace_back(dir); if (mux != nullptr) { - mPipelineConfigDirMutexMap[dir] = mux; - } -} - -void ConfigWatcher::AddInstanceSource(const string& dir, mutex* mux) { - mInstanceConfigDir.emplace_back(dir); - if (mux != nullptr) { - mInstanceConfigDirMutexMap[dir] = mux; - } -} - -void ConfigWatcher::AddCommandSource(const string& dir, mutex* mux) { - mCommandConfigDir.emplace_back(dir); - if (mux != nullptr) { - mCommandConfigDirMutexMap[dir] = mux; + mDirMutexMap[dir] = mux; } } void ConfigWatcher::ClearEnvironment() { - mPipelineConfigDir.clear(); - mPipelineFileInfoMap.clear(); - - mInstanceConfigDir.clear(); - mInstanceFileInfoMap.clear(); - - mCommandConfigDir.clear(); + mSourceDir.clear(); + mFileInfoMap.clear(); } } // namespace logtail diff --git a/core/config/watcher/ConfigWatcher.h b/core/config/watcher/ConfigWatcher.h index f4f04f4fc0..ba58b28f76 100644 --- a/core/config/watcher/ConfigWatcher.h +++ b/core/config/watcher/ConfigWatcher.h @@ -24,13 +24,10 @@ #include #include "config/ConfigDiff.h" -#include "config/PipelineConfig.h" -#include "config/InstanceConfig.h" namespace logtail { class PipelineManager; -class InstanceConfigManager; class ConfigWatcher { public: @@ -42,46 +39,20 @@ class ConfigWatcher { return &instance; } - PipelineConfigDiff CheckPipelineConfigDiff(); - InstanceConfigDiff CheckInstanceConfigDiff(); - - void AddPipelineSource(const std::string& dir, std::mutex* mux = nullptr); - void AddInstanceSource(const std::string& dir, std::mutex* mux = nullptr); - void AddCommandSource(const std::string& dir, std::mutex* mux = nullptr); - + PipelineConfigDiff CheckConfigDiff(); + void AddSource(const std::string& dir, std::mutex* mux = nullptr); // for ut void SetPipelineManager(const PipelineManager* pm) { mPipelineManager = pm; } - void SetInstanceConfigManager(const InstanceConfigManager* pm) { mInstanceConfigManager = pm; } void ClearEnvironment(); private: ConfigWatcher(); ~ConfigWatcher() = default; - template - ConfigDiffType - CheckConfigDiff(const std::vector& configDir, - const std::unordered_map& configDirMutexMap, - std::map>& fileInfoMap, - const ConfigManagerType* configManager, - const std::string& configType); - - std::vector mPipelineConfigDir; - std::unordered_map mPipelineConfigDirMutexMap; - - std::vector mInstanceConfigDir; - std::unordered_map mInstanceConfigDirMutexMap; - - std::vector mCommandConfigDir; - std::unordered_map mCommandConfigDirMutexMap; - - std::map> mPipelineFileInfoMap; + std::vector mSourceDir; + std::unordered_map mDirMutexMap; + std::map> mFileInfoMap; const PipelineManager* mPipelineManager = nullptr; - - std::map> mInstanceFileInfoMap; - const InstanceConfigManager* mInstanceConfigManager = nullptr; - - bool CheckDirectoryStatus(const std::filesystem::path& dir); }; -} // namespace logtail +} // namespace logtail \ No newline at end of file diff --git a/core/config/watcher/InstanceConfigWatcher.cpp b/core/config/watcher/InstanceConfigWatcher.cpp new file mode 100644 index 0000000000..96bb100228 --- /dev/null +++ b/core/config/watcher/InstanceConfigWatcher.cpp @@ -0,0 +1,179 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "config/watcher/InstanceConfigWatcher.h" + +#include +#include + +#include "config/InstanceConfig.h" +#include "logger/Logger.h" + +using namespace std; + +namespace logtail { + +bool ReadFile(const string& filepath, string& content); + +InstanceConfigWatcher::InstanceConfigWatcher() : mInstanceConfigManager(InstanceConfigManager::GetInstance()) { +} + +InstanceConfigDiff InstanceConfigWatcher::CheckConfigDiff() { + InstanceConfigDiff diff; + unordered_set configSet; + for (const auto& dir : mSourceDir) { + error_code ec; + filesystem::file_status s = filesystem::status(dir, ec); + if (ec) { + LOG_WARNING(sLogger, + ("failed to get config dir path info", "skip current object")("dir path", dir.string())( + "error code", ec.value())("error msg", ec.message())); + continue; + } + if (!filesystem::exists(s)) { + LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string())); + continue; + } + if (!filesystem::is_directory(s)) { + LOG_WARNING(sLogger, + ("config dir path is not a directory", "skip current object")("dir path", dir.string())); + continue; + } + for (auto const& entry : filesystem::directory_iterator(dir, ec)) { + // lock the dir if it is provided by config provider + unique_lock lock; + auto itr = mDirMutexMap.find(dir.string()); + if (itr != mDirMutexMap.end()) { + lock = unique_lock(*itr->second, defer_lock); + lock.lock(); + } + + const filesystem::path& path = entry.path(); + const string& configName = path.stem().string(); + const string& filepath = path.string(); + if (!filesystem::is_regular_file(entry.status(ec))) { + LOG_DEBUG(sLogger, ("config file is not a regular file", "skip current object")("filepath", filepath)); + continue; + } + if (configSet.find(configName) != configSet.end()) { + LOG_WARNING( + sLogger, + ("more than 1 config with the same name is found", "skip current config")("filepath", filepath)); + continue; + } + configSet.insert(configName); + + auto iter = mFileInfoMap.find(filepath); + uintmax_t size = filesystem::file_size(path, ec); + filesystem::file_time_type mTime = filesystem::last_write_time(path, ec); + if (iter == mFileInfoMap.end()) { + mFileInfoMap[filepath] = make_pair(size, mTime); + Json::Value detail; + if (!LoadConfigDetailFromFile(path, detail)) { + continue; + } + if (!IsConfigEnabled(configName, detail)) { + LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName)); + continue; + } + InstanceConfig config(configName, detail, dir.string()); + diff.mAdded.push_back(std::move(config)); + LOG_INFO(sLogger, + ("new config found and passed topology check", "prepare to load instanceConfig")("config", + configName)); + } else if (iter->second.first != size || iter->second.second != mTime) { + // for config currently running, we leave it untouched if new config is invalid + mFileInfoMap[filepath] = make_pair(size, mTime); + Json::Value detail; + if (!LoadConfigDetailFromFile(path, detail)) { + continue; + } + if (!IsConfigEnabled(configName, detail)) { + if (mInstanceConfigManager->FindConfigByName(configName)) { + diff.mRemoved.push_back(configName); + LOG_INFO(sLogger, + ("existing valid config modified and disabled", + "prepare to stop current running instanceConfig")("config", configName)); + } else { + LOG_INFO(sLogger, + ("existing invalid config modified and disabled", "skip current object")("config", + configName)); + } + continue; + } + shared_ptr p = mInstanceConfigManager->FindConfigByName(configName); + if (!p) { + InstanceConfig config(configName, detail, dir.string()); + diff.mAdded.push_back(std::move(config)); + LOG_INFO(sLogger, + ("existing invalid config modified and passed topology check", + "prepare to load instanceConfig")("config", configName)); + } else if (detail != p->GetConfig()) { + InstanceConfig config(configName, detail, dir.string()); + diff.mModified.push_back(std::move(config)); + LOG_INFO(sLogger, + ("existing valid config modified and passed topology check", + "prepare to reload instanceConfig")("config", configName)); + } else { + LOG_DEBUG(sLogger, + ("existing valid config file modified, but no change found", "skip current object")); + } + } else { + LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object")); + } + } + } + for (const auto& name : mInstanceConfigManager->GetAllConfigNames()) { + if (configSet.find(name) == configSet.end()) { + diff.mRemoved.push_back(name); + LOG_INFO( + sLogger, + ("existing valid config is removed", "prepare to stop current running instanceConfig")("config", name)); + } + } + std::vector keysToRemove; + for (const auto& item : mFileInfoMap) { + string configName = filesystem::path(item.first).stem().string(); + if (configSet.find(configName) == configSet.end()) { + keysToRemove.push_back(item.first); + } + } + for (const auto& key : keysToRemove) { + mFileInfoMap.erase(key); + } + + if (!diff.IsEmpty()) { + LOG_INFO(sLogger, + ("config files scan done", "got updates, begin to update instanceConfigs")( + "added", diff.mAdded.size())("modified", diff.mModified.size())("removed", diff.mRemoved.size())); + } else { + LOG_DEBUG(sLogger, ("config files scan done", "no update")); + } + + return diff; +} + +void InstanceConfigWatcher::AddSource(const string& dir, mutex* mux) { + mSourceDir.emplace_back(dir); + if (mux != nullptr) { + mDirMutexMap[dir] = mux; + } +} + +void InstanceConfigWatcher::ClearEnvironment() { + mSourceDir.clear(); + mFileInfoMap.clear(); +} + +} // namespace logtail diff --git a/core/config/watcher/InstanceConfigWatcher.h b/core/config/watcher/InstanceConfigWatcher.h new file mode 100644 index 0000000000..81a37fce18 --- /dev/null +++ b/core/config/watcher/InstanceConfigWatcher.h @@ -0,0 +1,60 @@ +/* + * Copyright 2023 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "InstanceConfigManager.h" +#include "config/ConfigDiff.h" + +namespace logtail { + +class InstanceConfigManager; + +class InstanceConfigWatcher { +public: + InstanceConfigWatcher(const InstanceConfigWatcher&) = delete; + InstanceConfigWatcher& operator=(const InstanceConfigWatcher&) = delete; + + static InstanceConfigWatcher* GetInstance() { + static InstanceConfigWatcher instance; + return &instance; + } + + InstanceConfigDiff CheckConfigDiff(); + void AddSource(const std::string& dir, std::mutex* mux = nullptr); + // for ut + void SetInstanceConfigManager(const InstanceConfigManager* m) { mInstanceConfigManager = m; } + void ClearEnvironment(); + +private: + InstanceConfigWatcher(); + ~InstanceConfigWatcher() = default; + + std::vector mSourceDir; + std::unordered_map mDirMutexMap; + std::map> mFileInfoMap; + const InstanceConfigManager* mInstanceConfigManager = nullptr; +}; + +} // namespace logtail \ No newline at end of file diff --git a/core/pipeline/InstanceConfigManager.cpp b/core/instance_config/InstanceConfigManager.cpp similarity index 68% rename from core/pipeline/InstanceConfigManager.cpp rename to core/instance_config/InstanceConfigManager.cpp index 3c231fd1cd..986cc5ba5f 100644 --- a/core/pipeline/InstanceConfigManager.cpp +++ b/core/instance_config/InstanceConfigManager.cpp @@ -14,32 +14,33 @@ * limitations under the License. */ -#include "pipeline/InstanceConfigManager.h" +#include "instance_config/InstanceConfigManager.h" +#include "app_config/AppConfig.h" #include "config/feedbacker/ConfigFeedbackReceiver.h" using namespace std; namespace logtail { -InstanceConfigManager::InstanceConfigManager() { -} +InstanceConfigManager::InstanceConfigManager() = default; void InstanceConfigManager::UpdateInstanceConfigs(InstanceConfigDiff& diff) { for (auto& config : diff.mAdded) { - std::shared_ptr configTmp(new InstanceConfig(config.mName, std::move(config.mDetail))); - mInstanceConfigMap[config.mName] = configTmp; - ConfigFeedbackReceiver::GetInstance().FeedbackInstanceConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); + mInstanceConfigMap[config.mConfigName] = + std::make_shared(config.mConfigName, std::move(config.mDetail), config.mDirName); + ConfigFeedbackReceiver::GetInstance().FeedbackInstanceConfigStatus(config.mConfigName, ConfigFeedbackStatus::APPLIED); } for (auto& config : diff.mModified) { - std::shared_ptr configTmp(new InstanceConfig(config.mName, std::move(config.mDetail))); - mInstanceConfigMap[config.mName] = configTmp; - ConfigFeedbackReceiver::GetInstance().FeedbackInstanceConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); + mInstanceConfigMap[config.mConfigName] = + std::make_shared(config.mConfigName, std::move(config.mDetail), config.mDirName); + ConfigFeedbackReceiver::GetInstance().FeedbackInstanceConfigStatus(config.mConfigName, ConfigFeedbackStatus::APPLIED); } - for (auto& configName : diff.mRemoved) { + for (const auto& configName : diff.mRemoved) { mInstanceConfigMap.erase(configName); ConfigFeedbackReceiver::GetInstance().FeedbackInstanceConfigStatus(configName, ConfigFeedbackStatus::DELETED); } + AppConfig::GetInstance()->LoadInstanceConfig(mInstanceConfigMap); } std::shared_ptr InstanceConfigManager::FindConfigByName(const string& configName) const { @@ -52,6 +53,7 @@ std::shared_ptr InstanceConfigManager::FindConfigByName(const st vector InstanceConfigManager::GetAllConfigNames() const { vector res; + res.reserve(mInstanceConfigMap.size()); for (const auto& item : mInstanceConfigMap) { res.push_back(item.first); } diff --git a/core/pipeline/InstanceConfigManager.h b/core/instance_config/InstanceConfigManager.h similarity index 96% rename from core/pipeline/InstanceConfigManager.h rename to core/instance_config/InstanceConfigManager.h index 1727ea2c5c..2369994d1c 100644 --- a/core/pipeline/InstanceConfigManager.h +++ b/core/instance_config/InstanceConfigManager.h @@ -18,9 +18,7 @@ #include #include -#include -#include "common/Lock.h" #include "config/ConfigDiff.h" #include "config/InstanceConfig.h" diff --git a/core/observer/network/sources/ebpf/EBPFWrapper.cpp b/core/observer/network/sources/ebpf/EBPFWrapper.cpp index 034590ccf0..63f9392310 100644 --- a/core/observer/network/sources/ebpf/EBPFWrapper.cpp +++ b/core/observer/network/sources/ebpf/EBPFWrapper.cpp @@ -34,9 +34,6 @@ DEFINE_FLAG_INT64( sls_observer_ebpf_nobtf_kernel_version, "the minimum kernel version that supported eBPF normal running without self BTF file, 5.4.0.0 -> 5004000000", 5004000000); -DEFINE_FLAG_STRING(sls_observer_ebpf_host_path, - "the backup real host path for store libebpf.so", - "/etc/ilogtail/ebpf/"); static const std::string kLowkernelCentosName = "CentOS"; static const uint16_t kLowkernelCentosMinVersion = 7006; diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 7e452ae5ce..bb298c11ca 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -34,6 +34,8 @@ using namespace std; DEFINE_FLAG_INT32(check_send_client_timeout_interval, "", 600); +DEFINE_FLAG_BOOL(enable_flow_control, "if enable flow control", true); +DEFINE_FLAG_BOOL(enable_send_tps_smoothing, "avoid web server load burst", true); static const int SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND = 3; @@ -53,9 +55,49 @@ bool FlusherRunner::Init() { mThreadRes = async(launch::async, &FlusherRunner::Run, this); mLastCheckSendClientTime = time(nullptr); + LoadModuleConfig(true); + mCallback = [this]() { return LoadModuleConfig(false); }; + AppConfig::GetInstance()->RegisterCallback("max_bytes_per_sec", &mCallback); return true; } +bool FlusherRunner::LoadModuleConfig(bool isInit) { + auto ValidateFn = [](const std::string& key, const int32_t value) -> bool { + if (key == "max_bytes_per_sec") { + if (value < (int32_t)(1024 * 1024)) { + return false; + } + return true; + } + return true; + }; + if (isInit) { + // Only handle parameters that do not allow hot loading + } + auto maxBytePerSec = AppConfig::GetInstance()->MergeInt32(kDefaultMaxSendBytePerSec, + AppConfig::GetInstance()->GetMaxBytePerSec(), "max_bytes_per_sec", ValidateFn); + AppConfig::GetInstance()->SetMaxBytePerSec(maxBytePerSec); + UpdateSendFlowControl(); + return true; +} + +void FlusherRunner::UpdateSendFlowControl() { + // when inflow exceed 30MB/s, FlowControl lose precision + if (AppConfig::GetInstance()->GetMaxBytePerSec() >= 30 * 1024 * 1024) { + if (mSendFlowControl) + mSendFlowControl = false; + if (mSendRandomSleep) + mSendRandomSleep = false; + } else { + mSendRandomSleep = BOOL_FLAG(enable_send_tps_smoothing); + mSendFlowControl = BOOL_FLAG(enable_flow_control); + } + LOG_INFO(sLogger, + ("send byte per second limit", AppConfig::GetInstance()->GetMaxBytePerSec())( + "send flow control", mSendFlowControl ? "enable" : "disable")( + "send random sleep", mSendRandomSleep ? "enable" : "disable")); +} + void FlusherRunner::Stop() { mIsFlush = true; SenderQueueManager::GetInstance()->Trigger(); @@ -126,7 +168,7 @@ void FlusherRunner::Run() { // smoothing send tps, walk around webserver load burst uint32_t bufferPackageCount = items.size(); - if (!Application::GetInstance()->IsExiting() && AppConfig::GetInstance()->IsSendRandomSleep()) { + if (!Application::GetInstance()->IsExiting() && mSendRandomSleep) { int64_t sleepMicroseconds = 0; if (bufferPackageCount < 20) sleepMicroseconds = (rand() % 30) * 10000; // 0ms ~ 300ms @@ -146,7 +188,7 @@ void FlusherRunner::Run() { *itr)("config-flusher-dst", QueueKeyManager::GetInstance()->GetName((*itr)->mQueueKey))( "wait time", ToString(waitTime.count()) + "ms")("try cnt", ToString((*itr)->mTryCnt))); - if (!Application::GetInstance()->IsExiting() && AppConfig::GetInstance()->IsSendFlowControl()) { + if (!Application::GetInstance()->IsExiting() && mSendFlowControl) { RateLimiter::FlowControl((*itr)->mRawSize, mSendLastTime, mSendLastByte, true); } diff --git a/core/runner/FlusherRunner.h b/core/runner/FlusherRunner.h index 90fdf7e5a4..642de46739 100644 --- a/core/runner/FlusherRunner.h +++ b/core/runner/FlusherRunner.h @@ -47,12 +47,17 @@ class FlusherRunner { int32_t GetSendingBufferCount() { return mHttpSendingCnt; } + bool LoadModuleConfig(bool isInit); + private: FlusherRunner() = default; ~FlusherRunner() = default; void Run(); void Dispatch(SenderQueueItem* item); + void UpdateSendFlowControl(); + + std::function mCallback; std::future mThreadRes; std::atomic_bool mIsFlush = false; @@ -64,6 +69,9 @@ class FlusherRunner { int64_t mSendLastTime = 0; int32_t mSendLastByte = 0; + bool mSendRandomSleep; + bool mSendFlowControl; + mutable MetricsRecordRef mMetricsRecordRef; CounterPtr mInItemsTotal; CounterPtr mInItemDataSizeBytes; @@ -76,6 +84,7 @@ class FlusherRunner { #ifdef APSARA_UNIT_TEST_MAIN friend class PluginRegistryUnittest; friend class FlusherRunnerUnittest; + friend class InstanceConfigManagerUnittest; #endif }; diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index ba42e5bf98..e1830e7bbe 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -41,6 +41,7 @@ macro(add_core_subdir) add_subdirectory(models) add_subdirectory(monitor) add_subdirectory(pipeline) + add_subdirectory(instance_config) add_subdirectory(plugin) add_subdirectory(polling) add_subdirectory(processor) diff --git a/core/unittest/app_config/AppConfigUnittest.cpp b/core/unittest/app_config/AppConfigUnittest.cpp index b7baa0d792..01b35f6bbb 100644 --- a/core/unittest/app_config/AppConfigUnittest.cpp +++ b/core/unittest/app_config/AppConfigUnittest.cpp @@ -27,6 +27,7 @@ DECLARE_FLAG_INT32(ebpf_aggregation_config_agg_window_second); DECLARE_FLAG_STRING(ebpf_converage_config_strategy); DECLARE_FLAG_STRING(ebpf_sample_config_strategy); DECLARE_FLAG_DOUBLE(ebpf_sample_config_config_rate); +DECLARE_FLAG_BOOL(logtail_mode); namespace logtail { @@ -37,7 +38,14 @@ class AppConfigUnittest : public ::testing::Test { private: void writeLogtailConfigJSON(const Json::Value& v) { LOG_INFO(sLogger, ("writeLogtailConfigJSON", v.toStyledString())); - OverwriteFile(STRING_FLAG(ilogtail_config), v.toStyledString()); + if (BOOL_FLAG(logtail_mode)) { + OverwriteFile(STRING_FLAG(ilogtail_config), v.toStyledString()); + } else { + CreateAgentDir(); + std::string conf = GetAgentConfDir() + "/instance_config/local/loongcollector_config.json"; + AppConfig::GetInstance()->LoadAppConfig(conf); + OverwriteFile(conf, v.toStyledString()); + } } template diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index e434210b7c..66b1b17298 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -21,11 +21,14 @@ #include "config/ConfigDiff.h" #include "config/common_provider/CommonConfigProvider.h" #include "config/watcher/ConfigWatcher.h" +#include "config/watcher/InstanceConfigWatcher.h" #include "gmock/gmock.h" +#include "instance_config/InstanceConfigManager.h" #include "pipeline/PipelineManager.h" -#include "pipeline/InstanceConfigManager.h" #include "unittest/Unittest.h" +DECLARE_FLAG_BOOL(logtail_mode); + using namespace testing; using namespace std; @@ -33,7 +36,8 @@ namespace logtail { class MockCommonConfigProvider : public CommonConfigProvider { public: - MOCK_METHOD5(SendHttpRequest, bool(const std::string&, const std::string&, const std::string&, const std::string&, std::string&)); + MOCK_METHOD5(SendHttpRequest, + bool(const std::string&, const std::string&, const std::string&, const std::string&, std::string&)); }; class CommonConfigProviderUnittest : public ::testing::Test { @@ -41,7 +45,6 @@ class CommonConfigProviderUnittest : public ::testing::Test { std::string mRootDir; const std::string& PS = PATH_SEPARATOR; string ilogtailConfigPath; - string subConfigPath; bool writeJsonToFile(const std::string& jsonString, const std::string& filePath) { Json::Reader reader; @@ -67,18 +70,29 @@ class CommonConfigProviderUnittest : public ::testing::Test { // 在每个测试用例开始前的设置 void SetUp() override { - mRootDir = GetProcessExecutionDir(); - bfs::create_directories(mRootDir); - ilogtailConfigPath = mRootDir + PS + STRING_FLAG(ilogtail_config); - subConfigPath = mRootDir + PS + STRING_FLAG(ilogtail_config) + ".d"; - bfs::create_directories(subConfigPath); - std::ofstream fout(ilogtailConfigPath.c_str()); - fout << "" << std::endl; - MockCommonConfigProvider provider; - provider.Init("common_v2"); - provider.Stop(); - bfs::remove_all(provider.mPipelineSourceDir.string()); - bfs::remove_all(provider.mInstanceSourceDir.string()); + if (BOOL_FLAG(logtail_mode)) { + mRootDir = GetProcessExecutionDir(); + bfs::create_directories(mRootDir); + ilogtailConfigPath = mRootDir + PS + STRING_FLAG(ilogtail_config); + std::ofstream fout(ilogtailConfigPath.c_str()); + fout << "" << std::endl; + MockCommonConfigProvider provider; + provider.Init("common_v2"); + provider.Stop(); + bfs::remove_all(provider.mPipelineSourceDir.string()); + bfs::remove_all(provider.mInstanceSourceDir.string()); + } else { + CreateAgentDir(); + ilogtailConfigPath = GetAgentConfDir() + "/instance_config/local/loongcollector_config.json"; + AppConfig::GetInstance()->LoadAppConfig(ilogtailConfigPath); + std::ofstream fout(ilogtailConfigPath.c_str()); + fout << "" << std::endl; + MockCommonConfigProvider provider; + provider.Init("common_v2"); + provider.Stop(); + bfs::remove_all(provider.mPipelineSourceDir.string()); + bfs::remove_all(provider.mInstanceSourceDir.string()); + } } // 在每个测试用例结束后的清理 @@ -417,7 +431,7 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { APSARA_TEST_EQUAL(provider.mPipelineConfigInfoMap["config2"].status, ConfigFeedbackStatus::FAILED); // 处理 pipelineconfig - PipelineConfigDiff pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckConfigDiff(); PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff); APSARA_TEST_TRUE(!pipelineConfigDiff.IsEmpty()); APSARA_TEST_EQUAL(1U, pipelineConfigDiff.mAdded.size()); @@ -425,7 +439,7 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1); APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames()[0], "config1"); // 再次处理 pipelineconfig - pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckConfigDiff(); PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff); APSARA_TEST_TRUE(pipelineConfigDiff.IsEmpty()); APSARA_TEST_TRUE(pipelineConfigDiff.mAdded.empty()); @@ -438,19 +452,19 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { APSARA_TEST_EQUAL(provider.mInstanceConfigInfoMap["instanceconfig2"].status, ConfigFeedbackStatus::FAILED); // 处理 instanceconfig - InstanceConfigDiff instanceConfigDiff = ConfigWatcher::GetInstance()->CheckInstanceConfigDiff(); + InstanceConfigDiff instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff); APSARA_TEST_TRUE(!instanceConfigDiff.IsEmpty()); APSARA_TEST_EQUAL(1U, instanceConfigDiff.mAdded.size()); - APSARA_TEST_EQUAL(instanceConfigDiff.mAdded[0].mName, "instanceconfig1"); - APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 1); + APSARA_TEST_EQUAL(instanceConfigDiff.mAdded[0].mConfigName, "instanceconfig1"); + APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 2); APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames()[0], "instanceconfig1"); // 再次处理 instanceconfig - instanceConfigDiff = ConfigWatcher::GetInstance()->CheckInstanceConfigDiff(); + instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff); APSARA_TEST_TRUE(instanceConfigDiff.IsEmpty()); APSARA_TEST_TRUE(instanceConfigDiff.mAdded.empty()); - APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 1); + APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 2); APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames()[0], "instanceconfig1"); provider.Stop(); @@ -633,14 +647,14 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { APSARA_TEST_TRUE(provider.mPipelineConfigInfoMap.empty()); // 处理pipelineConfigDiff - PipelineConfigDiff pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckConfigDiff(); PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff); APSARA_TEST_TRUE(!pipelineConfigDiff.IsEmpty()); APSARA_TEST_EQUAL(1U, pipelineConfigDiff.mRemoved.size()); APSARA_TEST_EQUAL(pipelineConfigDiff.mRemoved[0], "config1"); APSARA_TEST_TRUE(PipelineManager::GetInstance()->GetAllConfigNames().empty()); // 再次处理pipelineConfigDiff - pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckConfigDiff(); PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff); APSARA_TEST_TRUE(pipelineConfigDiff.IsEmpty()); APSARA_TEST_TRUE(pipelineConfigDiff.mRemoved.empty()); @@ -648,16 +662,16 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { APSARA_TEST_TRUE(provider.mInstanceConfigInfoMap.empty()); // 处理instanceConfigDiff - InstanceConfigDiff instanceConfigDiff = ConfigWatcher::GetInstance()->CheckInstanceConfigDiff(); + InstanceConfigDiff instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff); - APSARA_TEST_TRUE(InstanceConfigManager::GetInstance()->GetAllConfigNames().empty()); + APSARA_TEST_TRUE(!InstanceConfigManager::GetInstance()->GetAllConfigNames().empty()); APSARA_TEST_EQUAL(1U, instanceConfigDiff.mRemoved.size()); APSARA_TEST_EQUAL(instanceConfigDiff.mRemoved[0], "instanceconfig1"); // 再次处理instanceConfigDiff - instanceConfigDiff = ConfigWatcher::GetInstance()->CheckInstanceConfigDiff(); + instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff); - APSARA_TEST_TRUE(InstanceConfigManager::GetInstance()->GetAllConfigNames().empty()); + APSARA_TEST_TRUE(!InstanceConfigManager::GetInstance()->GetAllConfigNames().empty()); APSARA_TEST_TRUE(instanceConfigDiff.IsEmpty()); APSARA_TEST_TRUE(instanceConfigDiff.mRemoved.empty()); diff --git a/core/unittest/config/ConfigUpdateUnittest.cpp b/core/unittest/config/ConfigUpdateUnittest.cpp index ad75101781..8dba34cb77 100644 --- a/core/unittest/config/ConfigUpdateUnittest.cpp +++ b/core/unittest/config/ConfigUpdateUnittest.cpp @@ -81,7 +81,7 @@ class ConfigUpdateUnittest : public testing::Test { void SetUp() override { filesystem::create_directories(configDir); - ConfigWatcher::GetInstance()->AddPipelineSource(configDir.string()); + ConfigWatcher::GetInstance()->AddSource(configDir.string()); } void TearDown() override { @@ -198,11 +198,11 @@ class ConfigUpdateUnittest : public testing::Test { void ConfigUpdateUnittest::OnStartUp() const { PipelineConfigDiff diff; - diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_TRUE(diff.IsEmpty()); GenerateInitialConfigs(); - diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_FALSE(diff.IsEmpty()); APSARA_TEST_EQUAL(2U, diff.mAdded.size()); APSARA_TEST_TRUE(diff.mModified.empty()); @@ -217,7 +217,7 @@ void ConfigUpdateUnittest::OnConfigDelete() const { APSARA_TEST_EQUAL(1U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); filesystem::remove_all(configDir); - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_FALSE(diff.IsEmpty()); APSARA_TEST_TRUE(diff.mAdded.empty()); APSARA_TEST_TRUE(diff.mModified.empty()); @@ -235,7 +235,7 @@ void ConfigUpdateUnittest::OnConfigToInvalidFormat() const { ofstream fout(path, ios::trunc); fout << newInvalidConfigWithInvalidFormat; } - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_TRUE(diff.IsEmpty()); } @@ -247,7 +247,7 @@ void ConfigUpdateUnittest::OnConfigToInvalidDetail() const { ofstream fout(path, ios::trunc); fout << newInvalidConfigWithInvalidDetail; } - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_FALSE(diff.IsEmpty()); APSARA_TEST_EQUAL(3U, diff.mAdded.size()); APSARA_TEST_EQUAL(1U, diff.mModified.size()); @@ -265,7 +265,7 @@ void ConfigUpdateUnittest::OnConfigToEnabledValid() const { ofstream fout(path, ios::trunc); fout << newEnabledValidConfig; } - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_FALSE(diff.IsEmpty()); APSARA_TEST_EQUAL(3U, diff.mAdded.size()); APSARA_TEST_EQUAL(1U, diff.mModified.size()); @@ -283,7 +283,7 @@ void ConfigUpdateUnittest::OnConfigToDisabledValid() const { ofstream fout(path, ios::trunc); fout << newDisabledValidConfig; } - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_FALSE(diff.IsEmpty()); APSARA_TEST_TRUE(diff.mAdded.empty()); APSARA_TEST_TRUE(diff.mModified.empty()); @@ -297,7 +297,7 @@ void ConfigUpdateUnittest::OnConfigUnchanged() const { PrepareInitialSettings(); APSARA_TEST_EQUAL(1U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_TRUE(diff.IsEmpty()); GenerateInitialConfigs(); @@ -306,7 +306,7 @@ void ConfigUpdateUnittest::OnConfigUnchanged() const { filesystem::file_time_type fTime = filesystem::last_write_time(path); filesystem::last_write_time(path, fTime + 1s); } - diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_FALSE(diff.IsEmpty()); APSARA_TEST_EQUAL(1U, diff.mAdded.size()); APSARA_TEST_TRUE(diff.mModified.empty()); @@ -336,7 +336,7 @@ void ConfigUpdateUnittest::OnConfigAdded() const { ofstream fout(configDir / "add_disabled_valid.json", ios::trunc); fout << disabledValidConfig; } - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_FALSE(diff.IsEmpty()); APSARA_TEST_EQUAL(2U, diff.mAdded.size()); APSARA_TEST_TRUE(diff.mModified.empty()); @@ -348,7 +348,7 @@ void ConfigUpdateUnittest::OnConfigAdded() const { void ConfigUpdateUnittest::PrepareInitialSettings() const { GenerateInitialConfigs(); - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); PipelineManagerMock::GetInstance()->UpdatePipelines(diff); } diff --git a/core/unittest/config/ConfigUpdatorUnittest.cpp b/core/unittest/config/ConfigUpdatorUnittest.cpp index 9d54bb2b83..32f9ab1909 100644 --- a/core/unittest/config/ConfigUpdatorUnittest.cpp +++ b/core/unittest/config/ConfigUpdatorUnittest.cpp @@ -55,7 +55,6 @@ DECLARE_FLAG_INT32(mem_check_point_time_out); DECLARE_FLAG_INT32(file_check_point_time_out); DECLARE_FLAG_INT32(check_point_check_interval); DECLARE_FLAG_STRING(default_global_topic); -DECLARE_FLAG_INT32(default_max_send_byte_per_sec); DECLARE_FLAG_INT32(default_send_byte_per_sec); DECLARE_FLAG_INT32(default_buffer_file_num); DECLARE_FLAG_INT32(default_local_file_size); @@ -2157,7 +2156,7 @@ void ConfigUpdatorUnittest::TestLoadIlogtailConfig() { AppConfig::GetInstance()->LoadAppConfig(ilogtailConfig); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mCpuUsageUpLimit, DOUBLE_FLAG(cpu_usage_up_limit)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mMemUsageUpLimit, INT64_FLAG(memory_usage_up_limit)); - APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetMaxBytePerSec(), INT32_FLAG(default_max_send_byte_per_sec)); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetMaxBytePerSec(), kDefaultMaxSendBytePerSec); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetBytePerSec(), INT32_FLAG(default_send_byte_per_sec)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetNumOfBufferFile(), INT32_FLAG(default_buffer_file_num)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetLocalFileSize(), INT32_FLAG(default_local_file_size)); diff --git a/core/unittest/config/ConfigWatcherUnittest.cpp b/core/unittest/config/ConfigWatcherUnittest.cpp index 19cee7d545..20df15856f 100644 --- a/core/unittest/config/ConfigWatcherUnittest.cpp +++ b/core/unittest/config/ConfigWatcherUnittest.cpp @@ -17,6 +17,7 @@ #include "config/ConfigDiff.h" #include "config/watcher/ConfigWatcher.h" +#include "config/watcher/InstanceConfigWatcher.h" #include "pipeline/plugin/PluginRegistry.h" #include "unittest/Unittest.h" @@ -32,8 +33,8 @@ class ConfigWatcherUnittest : public testing::Test { protected: void SetUp() override { - ConfigWatcher::GetInstance()->AddPipelineSource(configDir.string()); - ConfigWatcher::GetInstance()->AddInstanceSource(instanceConfigDir.string()); + ConfigWatcher::GetInstance()->AddSource(configDir.string()); + InstanceConfigWatcher::GetInstance()->AddSource(instanceConfigDir.string()); } void TearDown() override { ConfigWatcher::GetInstance()->ClearEnvironment(); } @@ -48,39 +49,39 @@ const filesystem::path ConfigWatcherUnittest::instanceConfigDir = "./instance_co void ConfigWatcherUnittest::InvalidConfigDirFound() const { { - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); - APSARA_TEST_TRUE(diff.IsEmpty()); + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_TRUE(diff.IsEmpty()); - { ofstream fout("config"); } - diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); - APSARA_TEST_TRUE(diff.IsEmpty()); - filesystem::remove("config"); + { ofstream fout("config"); } + diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_TRUE(diff.IsEmpty()); + filesystem::remove_all("config"); } { - InstanceConfigDiff diff = ConfigWatcher::GetInstance()->CheckInstanceConfigDiff(); + InstanceConfigDiff diff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_TRUE(diff.IsEmpty()); { ofstream fout("instance_config"); } - diff = ConfigWatcher::GetInstance()->CheckInstanceConfigDiff(); + diff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_TRUE(diff.IsEmpty()); - filesystem::remove("instance_config"); + filesystem::remove_all("instance_config"); } } void ConfigWatcherUnittest::InvalidConfigFileFound() const { { - filesystem::create_directories(configDir); + filesystem::create_directories(configDir); - filesystem::create_directories(configDir / "dir"); - { ofstream fout(configDir / "unsupported_extenstion.zip"); } - { ofstream fout(configDir / "empty_file.json"); } - { - ofstream fout(configDir / "invalid_format.json"); - fout << "[}"; - } - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); - APSARA_TEST_TRUE(diff.IsEmpty()); - filesystem::remove_all(configDir); + filesystem::create_directories(configDir / "dir"); + { ofstream fout(configDir / "unsupported_extenstion.zip"); } + { ofstream fout(configDir / "empty_file.json"); } + { + ofstream fout(configDir / "invalid_format.json"); + fout << "[}"; + } + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_TRUE(diff.IsEmpty()); + filesystem::remove_all(configDir); } { filesystem::create_directories(instanceConfigDir); @@ -92,7 +93,7 @@ void ConfigWatcherUnittest::InvalidConfigFileFound() const { ofstream fout(instanceConfigDir / "invalid_format.json"); fout << "[}"; } - InstanceConfigDiff diff = ConfigWatcher::GetInstance()->CheckInstanceConfigDiff(); + InstanceConfigDiff diff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_TRUE(diff.IsEmpty()); filesystem::remove_all(instanceConfigDir); } @@ -100,17 +101,17 @@ void ConfigWatcherUnittest::InvalidConfigFileFound() const { void ConfigWatcherUnittest::DuplicateConfigs() const { { - PluginRegistry::GetInstance()->LoadPlugins(); - ConfigWatcher::GetInstance()->AddPipelineSource("dir1"); - ConfigWatcher::GetInstance()->AddPipelineSource("dir2"); + PluginRegistry::GetInstance()->LoadPlugins(); + ConfigWatcher::GetInstance()->AddSource("dir1"); + ConfigWatcher::GetInstance()->AddSource("dir2"); - filesystem::create_directories("config"); - filesystem::create_directories("dir1"); - filesystem::create_directories("dir2"); + filesystem::create_directories("config"); + filesystem::create_directories("dir1"); + filesystem::create_directories("dir2"); - { - ofstream fout("dir1/config.json"); - fout << R"( + { + ofstream fout("dir1/config.json"); + fout << R"( { "inputs": [ { @@ -124,21 +125,21 @@ void ConfigWatcherUnittest::DuplicateConfigs() const { ] } )"; - } - { ofstream fout("dir2/config.json"); } - PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff(); - APSARA_TEST_FALSE(diff.IsEmpty()); - APSARA_TEST_EQUAL(1U, diff.mAdded.size()); - - filesystem::remove_all("dir1"); - filesystem::remove_all("dir2"); - filesystem::remove_all("config"); - PluginRegistry::GetInstance()->UnloadPlugins(); + } + { ofstream fout("dir2/config.json"); } + PipelineConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_FALSE(diff.IsEmpty()); + APSARA_TEST_EQUAL(1U, diff.mAdded.size()); + + filesystem::remove_all("dir1"); + filesystem::remove_all("dir2"); + filesystem::remove_all("config"); + PluginRegistry::GetInstance()->UnloadPlugins(); } { PluginRegistry::GetInstance()->LoadPlugins(); - ConfigWatcher::GetInstance()->AddInstanceSource("dir1"); - ConfigWatcher::GetInstance()->AddInstanceSource("dir2"); + InstanceConfigWatcher::GetInstance()->AddSource("dir1"); + InstanceConfigWatcher::GetInstance()->AddSource("dir2"); filesystem::create_directories("instance_config"); filesystem::create_directories("dir1"); @@ -156,7 +157,7 @@ void ConfigWatcherUnittest::DuplicateConfigs() const { )"; } { ofstream fout("dir2/config.json"); } - InstanceConfigDiff diff = ConfigWatcher::GetInstance()->CheckInstanceConfigDiff(); + InstanceConfigDiff diff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_FALSE(diff.IsEmpty()); APSARA_TEST_EQUAL(1U, diff.mAdded.size()); diff --git a/core/unittest/ebpf/eBPFServerUnittest.cpp b/core/unittest/ebpf/eBPFServerUnittest.cpp index 1b10b69329..870f24e78f 100644 --- a/core/unittest/ebpf/eBPFServerUnittest.cpp +++ b/core/unittest/ebpf/eBPFServerUnittest.cpp @@ -16,6 +16,8 @@ #include "common/JsonUtil.h" #include "ebpf/config.h" +DECLARE_FLAG_BOOL(logtail_mode); + namespace logtail { namespace ebpf { class eBPFServerUnittest : public testing::Test { @@ -80,7 +82,14 @@ class eBPFServerUnittest : public testing::Test { void GenerateBatchAppEvent(nami::NamiHandleBatchEventFunc cb); void writeLogtailConfigJSON(const Json::Value& v) { LOG_INFO(sLogger, ("writeLogtailConfigJSON", v.toStyledString())); - OverwriteFile(STRING_FLAG(ilogtail_config), v.toStyledString()); + if (BOOL_FLAG(logtail_mode)) { + OverwriteFile(STRING_FLAG(ilogtail_config), v.toStyledString()); + } else { + CreateAgentDir(); + std::string conf = GetAgentConfDir() + "/instance_config/local/loongcollector_config.json"; + AppConfig::GetInstance()->LoadAppConfig(conf); + OverwriteFile(conf, v.toStyledString()); + } } eBPFAdminConfig* config_; Pipeline p; diff --git a/core/unittest/instance_config/CMakeLists.txt b/core/unittest/instance_config/CMakeLists.txt new file mode 100644 index 0000000000..7d7d5fc089 --- /dev/null +++ b/core/unittest/instance_config/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright 2023 iLogtail Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.22) +project(instance_config_unittest) + +add_executable(instance_config_manager_unittest InstanceConfigManagerUnittest.cpp) +target_link_libraries(instance_config_manager_unittest ${UT_BASE_TARGET}) + +include(GoogleTest) +gtest_discover_tests(instance_config_manager_unittest) diff --git a/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp b/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp new file mode 100644 index 0000000000..df4e479674 --- /dev/null +++ b/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp @@ -0,0 +1,232 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "Flags.h" +#include "app_config/AppConfig.h" +#include "common/JsonUtil.h" +#include "config/InstanceConfig.h" +#include "instance_config/InstanceConfigManager.h" +#include "runner/FlusherRunner.h" +#include "unittest/Unittest.h" + +using namespace std; + +DECLARE_FLAG_BOOL(enable_send_tps_smoothing); +DECLARE_FLAG_BOOL(enable_flow_control); + +namespace logtail { + +class InstanceConfigManagerUnittest : public testing::Test { +public: + void TestUpdateInstanceConfigs(); + bool LoadModuleConfig(bool isInit); + int32_t max_bytes_per_sec; + int status = 0; +}; + +bool InstanceConfigManagerUnittest::LoadModuleConfig(bool isInit) { + auto ValidateFn = [](const std::string key, const auto value) -> bool { return true; }; + + if (status == 0) { + // Added + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeBool(false, false, "bool_true", ValidateFn), true); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt32(0, 0, "int32_true", ValidateFn), 1234); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt64(0, 0, "int64_true", ValidateFn), 1234567890); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeDouble(0, 0, "double_true", ValidateFn), 1234.56789); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeString("", "", "string_true", ValidateFn), "string"); + + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeBool(false, false, "bool_false", ValidateFn), false); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt32(0, 0, "int32_false", ValidateFn), 0); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt64(0, 0, "int64_false", ValidateFn), 0); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeDouble(0, 0, "double_false", ValidateFn), 0); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeString("", "", "string_false", ValidateFn), ""); + } else if (status == 1) { + // Modified + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeBool(true, true, "bool_true", ValidateFn), false); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt32(0, 0, "int32_true", ValidateFn), 12340); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt64(0, 0, "int64_true", ValidateFn), 12345678900); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeDouble(0, 0, "double_true", ValidateFn), 12340.56789); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeString("", "", "string_true", ValidateFn), "string0"); + + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeBool(true, true, "bool_false", ValidateFn), true); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt32(10, 10, "int32_false", ValidateFn), 10); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt64(10, 10, "int64_false", ValidateFn), 10); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeDouble(10.1, 10.1, "double_false", ValidateFn), 10.1); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeString("10.1", "10.1", "string_false", ValidateFn), "10.1"); + } else if (status == 2) { + status = 3; + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeBool(false, false, "bool_true", ValidateFn), false); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt32(0, 0, "int32_true", ValidateFn), 0); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt64(0, 0, "int64_true", ValidateFn), 0); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeDouble(0, 0, "double_true", ValidateFn), 0); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeString("", "", "string_true", ValidateFn), ""); + + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeBool(false, false, "bool_false", ValidateFn), false); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt32(0, 0, "int32_false", ValidateFn), 0); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeInt64(0, 0, "int64_false", ValidateFn), 0); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeDouble(0, 0, "double_false", ValidateFn), 0); + APSARA_TEST_EQUAL(AppConfig::GetInstance()->MergeString("", "", "string_false", ValidateFn), ""); + } + + return true; +} + +void InstanceConfigManagerUnittest::TestUpdateInstanceConfigs() { + std::function mCallback = [this]() { return LoadModuleConfig(false); }; + { + AppConfig::GetInstance()->GetInstance()->LoadAppConfig(STRING_FLAG(ilogtail_config)); + AppConfig::GetInstance()->GetInstance()->RegisterCallback("bool_true", &mCallback); + AppConfig::GetInstance()->GetInstance()->RegisterCallback("int32_true", &mCallback); + AppConfig::GetInstance()->GetInstance()->RegisterCallback("int64_true", &mCallback); + AppConfig::GetInstance()->GetInstance()->RegisterCallback("double_true", &mCallback); + AppConfig::GetInstance()->GetInstance()->RegisterCallback("string_true", &mCallback); + + AppConfig::GetInstance()->GetInstance()->RegisterCallback("bool_false", &mCallback); + AppConfig::GetInstance()->GetInstance()->RegisterCallback("int32_false", &mCallback); + AppConfig::GetInstance()->GetInstance()->RegisterCallback("int64_false", &mCallback); + AppConfig::GetInstance()->GetInstance()->RegisterCallback("double_false", &mCallback); + AppConfig::GetInstance()->GetInstance()->RegisterCallback("string_false", &mCallback); + } + FlusherRunner::GetInstance()->Init(); + // Added + { + InstanceConfigDiff configDiff; + { + std::string content = R"({ + "max_bytes_per_sec": 1234 + })"; + std::string errorMsg; + Json::Value detail; + APSARA_TEST_TRUE(ParseJsonTable(content, detail, errorMsg)); + APSARA_TEST_TRUE(errorMsg.empty()); + InstanceConfig config("test0", detail, "dir"); + configDiff.mAdded.emplace_back(config); + } + { + std::string content = R"({ + "bool_true": true, + "int32_true": 1234, + "int64_true": 1234567890, + "double_true": 1234.56789, + "string_true": "string" + })"; + std::string errorMsg; + Json::Value detail; + APSARA_TEST_TRUE(ParseJsonTable(content, detail, errorMsg)); + APSARA_TEST_TRUE(errorMsg.empty()); + InstanceConfig config("test1", detail, "dir"); + configDiff.mAdded.emplace_back(config); + } + { + std::string content = R"({ + "bool_false": 1, + "int32_false": false, + "int64_false": false, + "double_false": false, + "string_false": false + })"; + std::string errorMsg; + Json::Value detail; + APSARA_TEST_TRUE(ParseJsonTable(content, detail, errorMsg)); + APSARA_TEST_TRUE(errorMsg.empty()); + InstanceConfig config("test2", detail, "dir"); + configDiff.mAdded.emplace_back(config); + } + InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(configDiff); + + APSARA_TEST_EQUAL(3U, InstanceConfigManager::GetInstance()->GetAllConfigNames().size()); + APSARA_TEST_NOT_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test1")); + APSARA_TEST_NOT_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test2")); + APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test3")); + } + APSARA_TEST_EQUAL(kDefaultMaxSendBytePerSec, AppConfig::GetInstance()->GetMaxBytePerSec()); + APSARA_TEST_EQUAL(true, FlusherRunner::GetInstance()->mSendRandomSleep); + APSARA_TEST_EQUAL(true, FlusherRunner::GetInstance()->mSendFlowControl); + // Modified + status = 1; + { + InstanceConfigDiff configDiff; + { + std::string content = R"({ + "max_bytes_per_sec": 31457280 + })"; + std::string errorMsg; + Json::Value detail; + APSARA_TEST_TRUE(ParseJsonTable(content, detail, errorMsg)); + APSARA_TEST_TRUE(errorMsg.empty()); + InstanceConfig config("test0", detail, "dir"); + configDiff.mAdded.emplace_back(config); + } + { + std::string content = R"({ + "bool_true": false, + "int32_true": 12340, + "int64_true": 12345678900, + "double_true": 12340.56789, + "string_true": "string0" + })"; + std::string errorMsg; + Json::Value detail; + APSARA_TEST_TRUE(ParseJsonTable(content, detail, errorMsg)); + APSARA_TEST_TRUE(errorMsg.empty()); + InstanceConfig config("test1", detail, "dir"); + configDiff.mModified.emplace_back(config); + } + { + std::string content = R"({ + "bool_false": 1, + "int32_false": false, + "int64_false": false, + "double_false": false, + "string_false": false + })"; + std::string errorMsg; + Json::Value detail; + APSARA_TEST_TRUE(ParseJsonTable(content, detail, errorMsg)); + APSARA_TEST_TRUE(errorMsg.empty()); + InstanceConfig config("test2", detail, "dir"); + configDiff.mModified.emplace_back(config); + } + InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(configDiff); + + APSARA_TEST_EQUAL(3U, InstanceConfigManager::GetInstance()->GetAllConfigNames().size()); + APSARA_TEST_NOT_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test1")); + APSARA_TEST_NOT_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test2")); + APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test3")); + } + APSARA_TEST_EQUAL(31457280, AppConfig::GetInstance()->GetMaxBytePerSec()); + APSARA_TEST_EQUAL(false, FlusherRunner::GetInstance()->mSendRandomSleep); + APSARA_TEST_EQUAL(false, FlusherRunner::GetInstance()->mSendFlowControl); + // Removed + status = 2; + { + InstanceConfigDiff configDiff; + configDiff.mRemoved.emplace_back("test1"); + configDiff.mRemoved.emplace_back("test2"); + InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(configDiff); + + APSARA_TEST_EQUAL(1U, InstanceConfigManager::GetInstance()->GetAllConfigNames().size()); + APSARA_TEST_NOT_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test0")); + APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test1")); + APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test2")); + APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test3")); + } + FlusherRunner::GetInstance()->Stop(); +} + +UNIT_TEST_CASE(InstanceConfigManagerUnittest, TestUpdateInstanceConfigs) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/pipeline/CMakeLists.txt b/core/unittest/pipeline/CMakeLists.txt index e9b3969dfa..000294a4e5 100644 --- a/core/unittest/pipeline/CMakeLists.txt +++ b/core/unittest/pipeline/CMakeLists.txt @@ -24,9 +24,6 @@ target_link_libraries(pipeline_unittest ${UT_BASE_TARGET}) add_executable(pipeline_manager_unittest PipelineManagerUnittest.cpp) target_link_libraries(pipeline_manager_unittest ${UT_BASE_TARGET}) -add_executable(instance_config_manager_unittest InstanceConfigManagerUnittest.cpp) -target_link_libraries(instance_config_manager_unittest ${UT_BASE_TARGET}) - add_executable(concurrency_limiter_unittest ConcurrencyLimiterUnittest.cpp) target_link_libraries(concurrency_limiter_unittest ${UT_BASE_TARGET}) @@ -34,6 +31,5 @@ include(GoogleTest) gtest_discover_tests(global_config_unittest) gtest_discover_tests(pipeline_unittest) gtest_discover_tests(pipeline_manager_unittest) -gtest_discover_tests(instance_config_manager_unittest) gtest_discover_tests(concurrency_limiter_unittest) diff --git a/core/unittest/pipeline/InstanceConfigManagerUnittest.cpp b/core/unittest/pipeline/InstanceConfigManagerUnittest.cpp deleted file mode 100644 index 85ba8aaa6b..0000000000 --- a/core/unittest/pipeline/InstanceConfigManagerUnittest.cpp +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2023 iLogtail Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "app_config/AppConfig.h" -#include "common/JsonUtil.h" -#include "config/InstanceConfig.h" -#include "pipeline/InstanceConfigManager.h" -#include "unittest/Unittest.h" - -using namespace std; - -DECLARE_FLAG_BOOL(enable_send_tps_smoothing); -DECLARE_FLAG_BOOL(enable_flow_control); - -namespace logtail { - -class InstanceConfigManagerUnittest : public testing::Test { -public: - void TestUpdateInstanceConfigs(); -}; - -void InstanceConfigManagerUnittest::TestUpdateInstanceConfigs() { - AppConfig::GetInstance(); - // Added - { - InstanceConfigDiff configDiff; - std::string content - = R"({"enable":true,"max_bytes_per_sec":1234,"mem_usage_limit":456,"cpu_usage_limit":2,"bool":false,"int":-1,"int64":-1000000,"uint":10000,"uint64":100000000000,"double":123123.1,"string":"string","array":[1,2,3],"object":{"a":1}})"; - std::string errorMsg; - unique_ptr detail = unique_ptr(new Json::Value()); - APSARA_TEST_TRUE(ParseJsonTable(content, *detail, errorMsg)); - APSARA_TEST_TRUE(errorMsg.empty()); - InstanceConfig config("test1", std::move(detail)); - configDiff.mAdded.emplace_back(config); - InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(configDiff); - - APSARA_TEST_EQUAL(1U, InstanceConfigManager::GetInstance()->GetAllConfigNames().size()); - APSARA_TEST_NOT_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test1")); - APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test3")); - } - - // Modified - { - InstanceConfigDiff configDiff; - std::string content - = R"({"enable": true,"max_bytes_per_sec": 209715200, "mem_usage_limit":123, "cpu_usage_limit":4,"bool":false,"int":-1,"int64":-1000000,"uint":10000,"uint64":100000000000,"double":123123.1,"string":"string","array":[1,2,3],"object":{"a":1}})"; - std::string errorMsg; - unique_ptr detail = unique_ptr(new Json::Value()); - APSARA_TEST_TRUE(ParseJsonTable(content, *detail, errorMsg)); - APSARA_TEST_TRUE(errorMsg.empty()); - InstanceConfig config("test1", std::move(detail)); - configDiff.mModified.emplace_back(config); - InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(configDiff); - - APSARA_TEST_EQUAL(1U, InstanceConfigManager::GetInstance()->GetAllConfigNames().size()); - APSARA_TEST_NOT_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test1")); - APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test3")); - APSARA_TEST_NOT_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test1")); - } - - // mRemoved - { - InstanceConfigDiff configDiff; - configDiff.mRemoved.emplace_back("test1"); - InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(configDiff); - - APSARA_TEST_EQUAL(0U, InstanceConfigManager::GetInstance()->GetAllConfigNames().size()); - APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test1")); - APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test3")); - } -} - -UNIT_TEST_CASE(InstanceConfigManagerUnittest, TestUpdateInstanceConfigs) - -} // namespace logtail - -UNIT_TEST_MAIN diff --git a/docker/Dockerfile_development_part b/docker/Dockerfile_development_part index b5282bb50a..fde564cd56 100644 --- a/docker/Dockerfile_development_part +++ b/docker/Dockerfile_development_part @@ -20,7 +20,7 @@ ARG VERSION=0.0.1 USER root WORKDIR /loongcollector -RUN mkdir -p /loongcollector/conf +RUN mkdir -p /loongcollector/conf/instance_config/local RUN mkdir -p /loongcollector/log RUN mkdir -p /loongcollector/data RUN mkdir -p /loongcollector/run @@ -35,7 +35,7 @@ RUN chown -R $(whoami) /loongcollector && \ rm /tmp/download_ebpflib.sh COPY --from=build /src/output/libGoPluginBase.so /loongcollector/ -COPY --from=build /src/example_config/quick_start/loongcollector_config.json /loongcollector/ +COPY --from=build /src/example_config/quick_start/loongcollector_config.json /loongcollector/conf/instance_config/local/loongcollector_config.json COPY --from=build /src/core/build/go_pipeline/libGoPluginAdapter.so /loongcollector/ ENV HOST_OS=$HOST_OS diff --git a/docker/Dockerfile_e2e b/docker/Dockerfile_e2e index 1321a35ec7..98229a0a18 100644 --- a/docker/Dockerfile_e2e +++ b/docker/Dockerfile_e2e @@ -20,7 +20,7 @@ ARG VERSION=0.0.1 USER root WORKDIR /loongcollector -RUN mkdir -p /loongcollector/conf +RUN mkdir -p /loongcollector/conf/instance_config/local RUN mkdir -p /loongcollector/log RUN mkdir -p /loongcollector/data RUN mkdir -p /loongcollector/run @@ -35,7 +35,7 @@ RUN chown -R $(whoami) /loongcollector && \ rm /tmp/download_ebpflib.sh COPY ./output/libGoPluginBase.so /loongcollector/ -COPY ./example_config/quick_start/loongcollector_config.json /loongcollector/ +COPY ./example_config/quick_start/loongcollector_config.json /loongcollector/conf/instance_config/local/loongcollector_config.json COPY ./output/libGoPluginAdapter.so /loongcollector/ ENV HOST_OS=$HOST_OS diff --git a/docker/Dockerfile_production b/docker/Dockerfile_production index 95b618b840..602ccd4915 100644 --- a/docker/Dockerfile_production +++ b/docker/Dockerfile_production @@ -39,12 +39,12 @@ RUN chown -R $(whoami) /usr/local/loongcollector/ && \ mkdir -p /usr/local/loongcollector/data/checkpoint WORKDIR /usr/local/loongcollector -RUN mkdir -p /usr/local/loongcollector/conf +RUN mkdir -p /usr/local/loongcollector/conf/instance_config/local RUN mkdir -p /usr/local/loongcollector/log RUN mkdir -p /usr/local/loongcollector/data RUN mkdir -p /usr/local/loongcollector/run -COPY example_config/start_with_docker/loongcollector_config.json /usr/local/loongcollector/ +COPY example_config/start_with_docker/loongcollector_config.json /usr/local/loongcollector/conf/instance_config/local/loongcollector_config.json COPY scripts/loongcollector_control.sh /usr/local/loongcollector/ ENV HTTP_PROBE_PORT=7953 \ diff --git a/docker/Dockerfile_production_minimal b/docker/Dockerfile_production_minimal index 71071a4416..7b177c5873 100644 --- a/docker/Dockerfile_production_minimal +++ b/docker/Dockerfile_production_minimal @@ -21,7 +21,8 @@ COPY dist/loongcollector-${VERSION}.linux-*.tar.gz . RUN tar -xzf loongcollector-${VERSION}.linux-${TARGETPLATFORM##*/}.tar.gz RUN mv /usr/local/loongcollector-${VERSION} /usr/local/loongcollector && \ mkdir -p /usr/local/loongcollector/data/checkpoint && \ - cp example_config/start_with_docker/loongcollector_config.json /usr/local/loongcollector/ && \ + mkdir -p /usr/local/loongcollector/conf/instance_config/local && \ + cp example_config/start_with_docker/loongcollector_config.json /usr/local/loongcollector/conf/instance_config/local/ && \ chmod 755 /usr/local/loongcollector/loongcollector FROM gcr.lank8s.cn/distroless/cc-debian12:latest diff --git a/scripts/dist.sh b/scripts/dist.sh index 0b5f9ed554..74fbc777f3 100755 --- a/scripts/dist.sh +++ b/scripts/dist.sh @@ -40,8 +40,9 @@ cp LICENSE README.md "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}" cp "${ROOTDIR}/${OUT_DIR}/loongcollector" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}" cp "${ROOTDIR}/${OUT_DIR}/libGoPluginAdapter.so" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}" cp "${ROOTDIR}/${OUT_DIR}/libGoPluginBase.so" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}" -cp "${ROOTDIR}/${OUT_DIR}/loongcollector_config.json" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}" -cp -a "${ROOTDIR}/${OUT_DIR}/config/local" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/conf" +mkdir -p "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/conf/instance_config/local/" +cp "${ROOTDIR}/${OUT_DIR}/conf/instance_config/local/loongcollector_config.json" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/conf/instance_config/local/" +cp -a "${ROOTDIR}/${OUT_DIR}/conf/pipeline_config/local" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/conf" if file "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/loongcollector" | grep x86-64; then ./scripts/download_ebpflib.sh "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}"; fi # Splitting debug info at build time with -gsplit-dwarf does not work with current gcc version diff --git a/scripts/gen_build_scripts.sh b/scripts/gen_build_scripts.sh index bc65b2e303..633743538e 100755 --- a/scripts/gen_build_scripts.sh +++ b/scripts/gen_build_scripts.sh @@ -118,8 +118,9 @@ function generateCopyScript() { echo 'rm -rf core/protobuf/sls && docker cp "$id":'${PATH_IN_DOCKER}'/core/protobuf/sls core/protobuf/sls' >>$COPY_SCRIPT_FILE fi fi - echo 'echo -e "{\n}" > $BINDIR/loongcollector_config.json' >>$COPY_SCRIPT_FILE - echo 'mkdir -p $BINDIR/config/local' >>$COPY_SCRIPT_FILE + echo 'mkdir -p $BINDIR/conf/instance_config/local/' >>$COPY_SCRIPT_FILE + echo 'echo -e "{\n}" > $BINDIR/conf/instance_config/local/loongcollector_config.json' >>$COPY_SCRIPT_FILE + echo 'mkdir -p $BINDIR/conf/pipeline_config/local' >>$COPY_SCRIPT_FILE echo 'docker rm -v "$id"' >>$COPY_SCRIPT_FILE }