From 4282d65406b79e16b06d5be4655e8d2205386e40 Mon Sep 17 00:00:00 2001 From: tsm Date: Mon, 8 Aug 2022 21:25:42 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=8D=B3=E5=B0=86=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E9=9B=86=E7=BE=A4=20feat:=20=E4=BA=8B=E5=8A=A1=E6=97=A5?= =?UTF-8?q?=E7=A8=8B=E8=B0=83=E6=95=B4=20feat:=20=E8=A1=A5=E5=85=85?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=E5=8A=A0=E8=BD=BD=E9=94=99=E8=AF=AF=E6=97=A5?= =?UTF-8?q?=E5=BF=97=20feat:=20=E8=A1=A5=E5=85=85=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E6=A8=A1=E6=9D=BF=E6=95=B0=E6=8D=AE=E6=96=87=E4=BB=B6=20fix=20?= =?UTF-8?q?:=20=E4=BF=AE=E5=A4=8D=E6=8F=92=E4=BB=B6=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=90=8D=E7=A7=B0=E9=94=99=E8=AF=AF=20fix=20:=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E6=8F=92=E4=BB=B6=E5=8A=A0=E8=BD=BD=E5=8F=98=E9=87=8F?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/feature.md | 6 +- plugins/tl_ops_plugin.lua | 6 +- plugins/tl_ops_ssl/get_ssl.lua | 2 +- plugins/tl_ops_ssl/set_ssl.lua | 2 +- plugins/tl_ops_ssl/ssl.lua | 2 +- plugins/tl_ops_ssl/sync.lua | 2 +- ...ant_ssl.lua => tl_ops_plugin_constant.lua} | 0 plugins/tl_ops_sync/sync.lua | 1 - plugins/tl_ops_sync/sync_constant_data.lua | 587 -------- plugins/tl_ops_sync/sync_constant_fields.lua | 1333 ----------------- ...nt_sync.lua => tl_ops_plugin_constant.lua} | 0 plugins/tl_ops_sync_cluster/sync_cluster.lua | 59 + .../tl_ops_sync_cluster/sync_cluster_data.lua | 0 .../sync_cluster_timer.lua | 78 + .../tl_ops_plugin_constant.lua | 28 + .../tl_ops_plugin_core.lua | 36 + .../tl_ops_plugin_constant.lua | 3 + 17 files changed, 214 insertions(+), 1931 deletions(-) rename plugins/tl_ops_ssl/{tl_ops_constant_ssl.lua => tl_ops_plugin_constant.lua} (100%) delete mode 100644 plugins/tl_ops_sync/sync_constant_data.lua delete mode 100644 plugins/tl_ops_sync/sync_constant_fields.lua rename plugins/tl_ops_sync/{tl_ops_constant_sync.lua => tl_ops_plugin_constant.lua} (100%) create mode 100644 plugins/tl_ops_sync_cluster/sync_cluster.lua create mode 100644 plugins/tl_ops_sync_cluster/sync_cluster_data.lua create mode 100644 plugins/tl_ops_sync_cluster/sync_cluster_timer.lua create mode 100644 plugins/tl_ops_sync_cluster/tl_ops_plugin_constant.lua create mode 100644 plugins/tl_ops_sync_cluster/tl_ops_plugin_core.lua create mode 100644 plugins/tl_ops_template/tl_ops_plugin_constant.lua diff --git a/doc/feature.md b/doc/feature.md index 548ebac..ba39a3e 100644 --- a/doc/feature.md +++ b/doc/feature.md @@ -86,9 +86,9 @@ -- [ ] 支持多协议转发 +- [ ] 支持多协议转换转发 -- [ ] 提供模块测试用例 +- [ ] 支持各模块测试用例 @@ -100,7 +100,7 @@ - [ ] 【插件】支持请求耗时统计 -- [ ] 【插件】支持请求告警配置 +- [ ] 【插件】支持请求告警通知 - [ ] 【插件】支持权限身份控制 diff --git a/plugins/tl_ops_plugin.lua b/plugins/tl_ops_plugin.lua index c1a0e1c..f0102e9 100644 --- a/plugins/tl_ops_plugin.lua +++ b/plugins/tl_ops_plugin.lua @@ -34,13 +34,13 @@ local tl_ops_process_load_plugins_constant = function(name) local status, constant = pcall(require, "plugins.tl_ops_" .. name .. ".tl_ops_plugin_constant") if status then - if plugin and type(constant) == 'table' then + if constant and type(constant) == 'table' then return constant else tlog:dbg("tl_ops_process_load_plugins_constant constant err, name=",name,",constant=",constant) end else - tlog:dbg("tl_ops_process_load_plugins_constant status err, name=",name,",status=",status) + tlog:dbg("tl_ops_process_load_plugins_constant status err, name=",name,",status=",status,",err=",constant) end return nil @@ -62,7 +62,7 @@ local tl_ops_process_load_plugins_func = function(name) tlog:dbg("tl_ops_process_load_plugins_func func err, name=",name,",func=",func) end else - tlog:dbg("tl_ops_process_load_plugins_func status err, name=",name,",status=",status) + tlog:dbg("tl_ops_process_load_plugins_func status err, name=",name,",status=",status,",err=",func) end return nil diff --git a/plugins/tl_ops_ssl/get_ssl.lua b/plugins/tl_ops_ssl/get_ssl.lua index f5a8423..5777911 100644 --- a/plugins/tl_ops_ssl/get_ssl.lua +++ b/plugins/tl_ops_ssl/get_ssl.lua @@ -5,7 +5,7 @@ -- @email 1905333456@qq.com local cache = require("cache.tl_ops_cache_core"):new("tl-ops-ssl"); -local tl_ops_constant_ssl = require("plugins.tl_ops_ssl.tl_ops_constant_ssl"); +local tl_ops_constant_ssl = require("plugins.tl_ops_ssl.tl_ops_plugin_constant"); local tl_ops_rt = tlops.constant.comm.tl_ops_rt; local tl_ops_utils_func = tlops.utils local cjson = require("cjson.safe"); diff --git a/plugins/tl_ops_ssl/set_ssl.lua b/plugins/tl_ops_ssl/set_ssl.lua index a05c681..2de4572 100644 --- a/plugins/tl_ops_ssl/set_ssl.lua +++ b/plugins/tl_ops_ssl/set_ssl.lua @@ -6,7 +6,7 @@ local snowflake = require("lib.snowflake"); local cache = require("cache.tl_ops_cache_core"):new("tl-ops-ssl"); -local tl_ops_constant_ssl = require("plugins.tl_ops_ssl.tl_ops_constant_ssl"); +local tl_ops_constant_ssl = require("plugins.tl_ops_ssl.tl_ops_plugin_constant"); local tl_ops_rt = tlops.constant.comm.tl_ops_rt; local tl_ops_utils_func = tlops.utils local cjson = require("cjson.safe"); diff --git a/plugins/tl_ops_ssl/ssl.lua b/plugins/tl_ops_ssl/ssl.lua index da2e023..17e7d12 100644 --- a/plugins/tl_ops_ssl/ssl.lua +++ b/plugins/tl_ops_ssl/ssl.lua @@ -4,7 +4,7 @@ -- @author iamtsm -- @email 1905333456@qq.com -local constant_ssl = require("plugins.tl_ops_ssl.tl_ops_constant_ssl"); +local constant_ssl = require("plugins.tl_ops_ssl.tl_ops_plugin_constant"); local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_plugin_ssl"); local cache_ssl = require("cache.tl_ops_cache_core"):new("tl-ops-ssl"); local shared = tlops.plugin_shared diff --git a/plugins/tl_ops_ssl/sync.lua b/plugins/tl_ops_ssl/sync.lua index ceec32d..f93a9e8 100644 --- a/plugins/tl_ops_ssl/sync.lua +++ b/plugins/tl_ops_ssl/sync.lua @@ -6,7 +6,7 @@ local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_plugin_ssl") local cache = require("cache.tl_ops_cache_core"):new("tl-ops-ssl") -local tl_ops_constant_ssl = require("plugins.tl_ops_ssl.tl_ops_constant_ssl") +local tl_ops_constant_ssl = require("plugins.tl_ops_ssl.tl_ops_plugin_constant") local tl_ops_rt = tlops.constant.comm.tl_ops_rt local cjson = require("cjson.safe") cjson.encode_empty_table_as_object(false) diff --git a/plugins/tl_ops_ssl/tl_ops_constant_ssl.lua b/plugins/tl_ops_ssl/tl_ops_plugin_constant.lua similarity index 100% rename from plugins/tl_ops_ssl/tl_ops_constant_ssl.lua rename to plugins/tl_ops_ssl/tl_ops_plugin_constant.lua diff --git a/plugins/tl_ops_sync/sync.lua b/plugins/tl_ops_sync/sync.lua index 005040d..28d235f 100644 --- a/plugins/tl_ops_sync/sync.lua +++ b/plugins/tl_ops_sync/sync.lua @@ -9,7 +9,6 @@ local sync_fields = require("plugins.tl_ops_sync.sync_fields") local sync_data = require("plugins.tl_ops_sync.sync_data") local sync_env = tlops.env.sync local utils = tlops.utils -local shared = tlops.balance_shared local _M = { diff --git a/plugins/tl_ops_sync/sync_constant_data.lua b/plugins/tl_ops_sync/sync_constant_data.lua deleted file mode 100644 index 99f296a..0000000 --- a/plugins/tl_ops_sync/sync_constant_data.lua +++ /dev/null @@ -1,587 +0,0 @@ --- sync_constant_data --- en : sync constant data to shared dict --- zn : 同步在文件中的静态配置到共享内存中,和store的数据进行合并 --- @author iamtsm --- @email 1905333456@qq.com - --- constant -local constant_service = tlops.constant.service -local constant_health = tlops.constant.health -local constant_limit = tlops.constant.limit -local constant_balance = tlops.constant.balance -local constant_balance_api = tlops.constant.balance_api -local constant_balance_param = tlops.constant.balance_param -local constant_balance_header = tlops.constant.balance_header -local constant_balance_cookie = tlops.constant.balance_cookie -local constant_waf = tlops.constant.waf -local constant_waf_ip = tlops.constant.waf_ip -local constant_waf_api = tlops.constant.waf_api -local constant_waf_cc = tlops.constant.waf_cc -local constant_waf_header = tlops.constant.waf_header -local constant_waf_cookie = tlops.constant.waf_cookie -local constant_waf_param = tlops.constant.waf_param -local tl_ops_rt = tlops.constant.comm.tl_ops_rt; --- cache -local cache_service = tlops.cache.service -local cache_limit = tlops.cache.limit -local cache_health = tlops.cache.health -local cache_balance_api = tlops.cache.balance_api -local cache_balance_param = tlops.cache.balance_param -local cache_balance_header = tlops.cache.balance_header -local cache_balance_cookie = tlops.cache.balance_cookie -local cache_balance = tlops.cache.balance -local cache_waf_api = tlops.cache.waf_api -local cache_waf_ip = tlops.cache.waf_ip -local cache_waf_cookie = tlops.cache.waf_cookie -local cache_waf_header = tlops.cache.waf_header -local cache_waf_cc = tlops.cache.waf_cc -local cache_waf_param = tlops.cache.waf_param -local cache_waf = tlops.cache.waf --- utils -local utils = tlops.utils -local cjson = require("cjson.safe"); -cjson.encode_empty_table_as_object(false) -local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_plugin_sync_constant_data"); - - -local _M = { - _VERSION = '0.01' -} - - --- 静态文件中的未同步到store的配置数据 -local sync_constant_data_need_sync = function (constant_data, store_data) - local add = {} - for i = 1, #constant_data do - local synced = false - for j = 1, #store_data do - if constant_data[i]['id'] == store_data[j]['id'] then - synced = true - break - end - end - if not synced then - table.insert(add, constant_data[i]) - end - end - return add -end - - ---+++++++++++++++路由策略数据同步合并,预热+++++++++++++++-- - - --- api策略静态配置数据 -local sync_constant_data_balance_api = function () - local cache_key_list = constant_balance_api.cache_key.list - - local data_str, _ = cache_balance_api:get(cache_key_list); - if not data_str then - local res, _ = cache_balance_api:set(cache_key_list, cjson.encode(constant_balance.api.list)) - if not res then - tlog:err("sync_constant_data_balance_api new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_api new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_balance_api err, old=",data) - return tl_ops_rt.error - end - - -- 静态配置 - local constant_data = constant_balance.api.list - - -- 获取需要同步的配置 - local add_point = sync_constant_data_need_sync(constant_data.point, data.point) - for i = 1, #add_point do - table.insert(data.point, add_point[i]) - end - - -- 获取需要同步的配置 - local add_random = sync_constant_data_need_sync(constant_data.random, data.random) - for i = 1, #add_random do - table.insert(data.random, add_random[i]) - end - - local res = cache_balance_api:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_balance_api err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_api done, new=",data) - - return tl_ops_rt.ok -end - --- cookie策略静态配置数据 -local sync_constant_data_balance_cookie = function () - local cache_key_list = constant_balance_cookie.cache_key.list - - local data_str, _ = cache_balance_cookie:get(cache_key_list); - if not data_str then - local res, _ = cache_balance_cookie:set(cache_key_list, cjson.encode(constant_balance.cookie.list)) - if not res then - tlog:err("sync_constant_data_balance_cookie new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_cookie new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_balance_cookie err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_cookie start, old=",data) - - -- 静态配置 - local constant_data = constant_balance.cookie.list - - -- 获取需要同步的配置 - local add_point = sync_constant_data_need_sync(constant_data.point, data.point) - for i = 1, #add_point do - table.insert(data.point, add_point[i]) - end - - -- 获取需要同步的配置 - local add_random = sync_constant_data_need_sync(constant_data.random, data.random) - for i = 1, #add_random do - table.insert(data.random, add_random[i]) - end - - local res = cache_balance_cookie:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_balance_cookie err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_cookie done, new=",data) - - return tl_ops_rt.ok -end - --- header策略静态配置数据 -local sync_constant_data_balance_header = function () - local cache_key_list = constant_balance_header.cache_key.list - - local data_str, _ = cache_balance_header:get(cache_key_list); - if not data_str then - local res, _ = cache_balance_header:set(cache_key_list, cjson.encode(constant_balance.header.list)) - if not res then - tlog:err("sync_constant_data_balance_header new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_header new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_balance_header err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_header start, old=",data) - - -- 静态配置 - local constant_data = constant_balance.header.list - - -- 获取需要同步的配置 - local add_point = sync_constant_data_need_sync(constant_data.point, data.point) - for i = 1, #add_point do - table.insert(data.point, add_point[i]) - end - - -- 获取需要同步的配置 - local add_random = sync_constant_data_need_sync(constant_data.random, data.random) - for i = 1, #add_random do - table.insert(data.random, add_random[i]) - end - - local res = cache_balance_header:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_balance_header err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_header done, new=",data) - - return tl_ops_rt.ok -end - --- param策略静态配置数据 -local sync_constant_data_balance_param = function () - local cache_key_list = constant_balance_param.cache_key.list - - local data_str, _ = cache_balance_param:get(cache_key_list); - if not data_str then - local res, _ = cache_balance_param:set(cache_key_list, cjson.encode(constant_balance.param.list)) - if not res then - tlog:err("sync_constant_data_balance_param new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_param new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_balance_param err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_param start, old=",data) - - -- 静态配置 - local constant_data = constant_balance.param.list - - -- 获取需要同步的配置 - local add_point = sync_constant_data_need_sync(constant_data.point, data.point) - for i = 1, #add_point do - table.insert(data.point, add_point[i]) - end - - -- 获取需要同步的配置 - local add_random = sync_constant_data_need_sync(constant_data.random, data.random) - for i = 1, #add_random do - table.insert(data.random, add_random[i]) - end - - local res = cache_balance_param:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_balance_param err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_balance_param done, new=",data) - - return tl_ops_rt.ok -end - - - ---+++++++++++++++WAF策略数据同步合并,预热+++++++++++++++-- - --- waf ip策略静态配置数据 -local sync_constant_data_waf_ip = function () - local cache_key_list = constant_waf_ip.cache_key.list - - local data_str, _ = cache_waf_ip:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_ip:set(cache_key_list, cjson.encode(constant_waf.ip.list)) - if not res then - tlog:err("sync_constant_data_waf_ip new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_ip new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_waf_ip err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_ip start, old=",data) - - -- 静态配置 - local constant_data = constant_waf.ip.list - - -- 获取需要同步的配置 - local add = sync_constant_data_need_sync(constant_data, data) - for i = 1, #add do - table.insert(data, add[i]) - end - - local res = cache_waf_ip:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_waf_ip err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_ip done, new=",data) - - return tl_ops_rt.ok -end - --- waf api策略静态配置数据 -local sync_constant_data_waf_api = function () - local cache_key_list = constant_waf_api.cache_key.list - - local data_str, _ = cache_waf_api:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_api:set(cache_key_list, cjson.encode(constant_waf.api.list)) - if not res then - tlog:err("sync_constant_data_waf_api new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_api new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_waf_api err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_api start, old=",data) - - -- 静态配置 - local constant_data = constant_waf.api.list - - -- 获取需要同步的配置 - local add = sync_constant_data_need_sync(constant_data, data) - for i = 1, #add do - table.insert(data, add[i]) - end - - local res = cache_waf_api:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_waf_api err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_api done, new=",data) - - return tl_ops_rt.ok -end - --- waf cookie策略静态配置数据 -local sync_constant_data_waf_cookie = function () - local cache_key_list = constant_waf_cookie.cache_key.list - - local data_str, _ = cache_waf_cookie:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_cookie:set(cache_key_list, cjson.encode(constant_waf.cookie.list)) - if not res then - tlog:err("sync_constant_data_waf_cookie new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_cookie new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_waf_cookie err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_cookie start, old=",data) - - -- 静态配置 - local constant_data = constant_waf.cookie.list - - -- 获取需要同步的配置 - local add = sync_constant_data_need_sync(constant_data, data) - for i = 1, #add do - table.insert(data, add[i]) - end - - local res = cache_waf_cookie:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_waf_cookie err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_cookie done, new=",data) - - return tl_ops_rt.ok -end - --- waf header策略静态配置数据 -local sync_constant_data_waf_header = function () - local cache_key_list = constant_waf_header.cache_key.list - - local data_str, _ = cache_waf_header:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_header:set(cache_key_list, cjson.encode(constant_waf.header.list)) - if not res then - tlog:err("sync_constant_data_waf_header new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_header new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_waf_header err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_header start, old=",data) - - -- 静态配置 - local constant_data = constant_waf.header.list - - -- 获取需要同步的配置 - local add = sync_constant_data_need_sync(constant_data, data) - for i = 1, #add do - table.insert(data, add[i]) - end - - local res = cache_waf_header:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_waf_header err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_header done, new=",data) - - return tl_ops_rt.ok -end - --- waf param策略静态配置数据 -local sync_constant_data_waf_param = function () - local cache_key_list = constant_waf_param.cache_key.list - - local data_str, _ = cache_waf_param:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_param:set(cache_key_list, cjson.encode(constant_waf.param.list)) - if not res then - tlog:err("sync_constant_data_waf_param new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_param new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_waf_param err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_param start, old=",data) - - -- 静态配置 - local constant_data = constant_waf.param.list - - -- 获取需要同步的配置 - local add = sync_constant_data_need_sync(constant_data, data) - for i = 1, #add do - table.insert(data, add[i]) - end - - local res = cache_waf_param:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_waf_param err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_param done, new=",data) - - return tl_ops_rt.ok -end - --- waf cc策略静态配置数据 -local sync_constant_data_waf_cc = function () - local cache_key_list = constant_waf_cc.cache_key.list - - local data_str, _ = cache_waf_cc:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_cc:set(cache_key_list, cjson.encode(constant_waf.cc.list)) - if not res then - tlog:err("sync_constant_data_waf_cc new store data err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_cc new store data, res=",res) - return tl_ops_rt.ok - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_data_waf_cc err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_cc start, old=",data) - - -- 静态配置 - local constant_data = constant_waf.cc.list - - -- 获取需要同步的配置 - local add = sync_constant_data_need_sync(constant_data, data) - for i = 1, #add do - table.insert(data, add[i]) - end - - local res = cache_waf_cc:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_data_waf_cc err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_data_waf_cc done, new=",data) - - return tl_ops_rt.ok -end - - - - -function _M:sync_constant_data_module( module ) - - if module == 'balance-api' then - return sync_constant_data_balance_api() - end - - if module == 'balance-cookie' then - return sync_constant_data_balance_cookie() - end - - if module == 'balance-header' then - return sync_constant_data_balance_header() - end - - if module == 'balance-param' then - return sync_constant_data_balance_param() - end - - if module == 'waf-api' then - return sync_constant_data_waf_api() - end - - if module == 'waf-ip' then - return sync_constant_data_waf_ip() - end - - if module == 'waf-header' then - return sync_constant_data_waf_header() - end - - if module == 'waf-cookie' then - return sync_constant_data_waf_cookie() - end - - if module == 'waf-param' then - return sync_constant_data_waf_param() - end - - if module == 'waf-cc' then - return sync_constant_data_waf_cc() - end -end - - -return _M \ No newline at end of file diff --git a/plugins/tl_ops_sync/sync_constant_fields.lua b/plugins/tl_ops_sync/sync_constant_fields.lua deleted file mode 100644 index 34c4253..0000000 --- a/plugins/tl_ops_sync/sync_constant_fields.lua +++ /dev/null @@ -1,1333 +0,0 @@ --- sync_constant_fields --- en : sync new fileds --- zn : 同步由于功能迭代引起的模块字段变更。 --- @author iamtsm --- @email 1905333456@qq.com - --- constant -local constant_service = tlops.constant.service -local constant_health = tlops.constant.health -local constant_limit = tlops.constant.limit -local constant_balance = tlops.constant.balance -local constant_balance_api = tlops.constant.balance_api -local constant_balance_param = tlops.constant.balance_param -local constant_balance_header = tlops.constant.balance_header -local constant_balance_cookie = tlops.constant.balance_cookie -local constant_waf = tlops.constant.waf -local constant_waf_ip = tlops.constant.waf_ip -local constant_waf_api = tlops.constant.waf_api -local constant_waf_cc = tlops.constant.waf_cc -local constant_waf_header = tlops.constant.waf_header -local constant_waf_cookie = tlops.constant.waf_cookie -local constant_waf_param = tlops.constant.waf_param -local tl_ops_rt = tlops.constant.comm.tl_ops_rt; --- cache -local cache_service = tlops.cache.service -local cache_limit = tlops.cache.limit -local cache_health = tlops.cache.health -local cache_balance_api = tlops.cache.balance_api -local cache_balance_param = tlops.cache.balance_param -local cache_balance_header = tlops.cache.balance_header -local cache_balance_cookie = tlops.cache.balance_cookie -local cache_balance = tlops.cache.balance -local cache_waf_api = tlops.cache.waf_api -local cache_waf_ip = tlops.cache.waf_ip -local cache_waf_cookie = tlops.cache.waf_cookie -local cache_waf_header = tlops.cache.waf_header -local cache_waf_cc = tlops.cache.waf_cc -local cache_waf_param = tlops.cache.waf_param -local cache_waf = tlops.cache.waf --- utils -local utils = tlops.utils -local tl_ops_limit_fuse_check_version = require("limit.fuse.tl_ops_limit_fuse_check_version") -local tl_ops_health_check_version = require("health.tl_ops_health_check_version") -local cjson = require("cjson.safe"); -cjson.encode_empty_table_as_object(false) -local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_plugin_sync_constant_fields"); - - -local _M = { - _VERSION = '0.01' -} - - ---+++++++++++++++服务节点数据同步+++++++++++++++-- - --- 服务节点数据同步 -local sync_constant_fields_service = function () - - local cache_key = constant_service.cache_key.service_list - local cache_rule_key = constant_service.cache_key.service_rule - local demo = constant_service.demo - - local data_str, _ = cache_service:get(cache_key); - if not data_str then - local res, _ = cache_service:set(cache_key, cjson.encode(constant_service.list)) - if not res then - tlog:err("sync_constant_fields_service new store list err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_service:get(cache_key); - - tlog:dbg("sync_constant_fields_service new store data, res=",res) - end - - local data_rule_str, _ = cache_service:get(cache_rule_key); - if not data_rule_str then - local res, _ = cache_service:set(cache_rule_key, constant_service.rule.auto_load) - if not res then - tlog:err("sync_constant_fields_service new store rule err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_service new store rule, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_service err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_service start, old=",data) - - local add_keys = {} - - for service , _ in pairs(data) do - local nodes = data[service] - if nodes then - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #nodes do - -- add keys - if nodes[i][key] == nil then - nodes[i][key] = demo[key] - table.insert(add_keys , key) - end - end - end - end - end - - local res = cache_service:set(cache_key, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_service err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_service done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - ---+++++++++++++++健康检查数据同步+++++++++++++++-- - --- 健康检查数据同步 -local sync_constant_fields_health = function () - - local cache_key = constant_health.cache_key.options_list - local demo = constant_health.demo - - local data_str, _ = cache_health:get(cache_key); - if not data_str then - local res, _ = cache_health:set(cache_key, cjson.encode(constant_health.options)) - if not res then - tlog:err("sync_constant_fields_health new store err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_health:get(cache_key); - - tlog:dbg("sync_constant_fields_health new store, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_health err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_health start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , key) - end - end - end - - local res = cache_health:set(cache_key, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_health err, res=",res,",new=",data) - return tl_ops_rt.error - end - - for i = 1, #constant_health.options do - local option = constant_health.options[i] - local service_name = option.check_service_name - if service_name then - tl_ops_health_check_version.incr_service_version(service_name) - end - end - - tlog:dbg("sync_constant_fields_health done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - ---+++++++++++++++限流熔断数据同步+++++++++++++++-- - --- 熔断数据同步 -local sync_constant_fields_limit = function () - local cache_key = constant_limit.fuse.cache_key.options_list - local demo = constant_limit.fuse.demo - - local data_str, _ = cache_limit:get(cache_key); - if not data_str then - local res, _ = cache_limit:set(cache_key, cjson.encode(constant_limit.fuse.options)) - if not res then - tlog:err("sync_constant_fields_limit new store err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_limit:get(cache_key); - - tlog:dbg("sync_constant_fields_limit new store, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_limit err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_limit start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , key) - end - end - end - - local res = cache_limit:set(cache_key, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_limit err, res=",res,",new=",data) - return tl_ops_rt.error - end - - for i = 1, #constant_limit.fuse.options do - local option = constant_limit.fuse.options[i] - local service_name = option.service_name - if service_name then - tl_ops_limit_fuse_check_version.incr_service_version(service_name) - end - end - - tlog:dbg("sync_constant_fields_limit done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- 限流数据同步 token -local sync_constant_fields_limit_token = function () - local cache_key = constant_limit.token.cache_key.options_list - local demo = constant_limit.token.demo - - local data_str, _ = cache_limit:get(cache_key); - if not data_str then - local res, _ = cache_limit:set(cache_key, cjson.encode(constant_limit.token.options)) - if not res then - tlog:err("sync_constant_fields_limit_token new store err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_limit:get(cache_key); - - tlog:dbg("sync_constant_fields_limit_token new store, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_limit_token err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_limit_token start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , key) - end - end - end - - local res = cache_limit:set(cache_key, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_limit_token err, res=",res,",new=",data) - return tl_ops_rt.error - end - - for i = 1, #constant_limit.fuse.options do - local option = constant_limit.fuse.options[i] - local service_name = option.service_name - if service_name then - tl_ops_limit_fuse_check_version.incr_service_version(service_name) - end - end - - tlog:dbg("sync_constant_fields_limit_token done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- 限流数据同步 leak -local sync_constant_fields_limit_leak = function () - local cache_key = constant_limit.leak.cache_key.options_list - local demo = constant_limit.leak.demo - - local data_str, _ = cache_limit:get(cache_key); - if not data_str then - local res, _ = cache_limit:set(cache_key, cjson.encode(constant_limit.leak.options)) - if not res then - tlog:err("sync_constant_fields_limit_leak new store err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_limit:get(cache_key); - - tlog:dbg("sync_constant_fields_limit_leak new store, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_limit_leak err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_limit_leak start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , key) - end - end - end - - for i = 1, #constant_limit.fuse.options do - local option = constant_limit.fuse.options[i] - local service_name = option.service_name - if service_name then - tl_ops_limit_fuse_check_version.incr_service_version(service_name) - end - end - - local res = cache_limit:set(cache_key, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_limit_leak err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_limit_leak done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - ---+++++++++++++++路由数据同步+++++++++++++++-- - --- 路由配置数据同步 -local sync_constant_fields_balance = function () - - local cache_key = constant_balance.cache_key.options - local demo = constant_balance.demo - - local data_str, _ = cache_balance:get(cache_key); - if not data_str then - local res, _ = cache_balance:set(cache_key, cjson.encode(constant_balance.options)) - if not res then - tlog:err("sync_constant_fields_balance new store err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_balance:get(cache_key); - - tlog:dbg("sync_constant_fields_balance new store, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_balance err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - -- add keys - if data[key] == nil then - data[key] = demo[key] - table.insert(add_keys , key) - end - end - - local res = cache_balance:set(cache_key, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_balance err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- api策略数据同步 -local sync_constant_fields_balance_api = function () - local cache_key_list = constant_balance_api.cache_key.list; - local cache_key_rule = constant_balance_api.cache_key.rule - - local demo = constant_balance_api.demo - - local data_str, _ = cache_balance_api:get(cache_key_list); - if not data_str then - local res, _ = cache_balance_api:set(cache_key_list, cjson.encode(constant_balance.api.list)) - if not res then - tlog:err("sync_constant_fields_balance_api new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_balance_api:get(cache_key_list) - - tlog:dbg("sync_constant_fields_balance_api new store data, res=",res) - end - - local data_rule_str, _ = cache_balance_api:get(cache_key_rule); - if not data_rule_str then - local res, _ = cache_balance_api:set(cache_key_rule, constant_balance.api.rule) - if not res then - tlog:err("sync_constant_fields_balance_api new store rule err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_api new store rule, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_balance_api err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_api start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo.point) do - -- data fileds check - for i = 1, #data.point do - -- add keys - if data.point[i][key] == nil then - data.point[i][key] = demo.point[key] - table.insert(add_keys , { - key = data.point[i][key] - }) - end - end - end - - -- demo fileds check - for key , _ in pairs(demo.random) do - -- data fileds check - for i = 1, #data.random do - -- add keys - if data.random[i][key] == nil then - data.random[i][key] = demo.random[key] - table.insert(add_keys , { - key = data.random[i][key] - }) - end - end - end - - local res = cache_balance_api:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_balance_api err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_api done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- cookie策略数据同步 -local sync_constant_fields_balance_cookie = function () - local cache_key_list = constant_balance_cookie.cache_key.list - local cache_key_rule = constant_balance_cookie.cache_key.rule - - local demo = constant_balance_cookie.demo - - local data_str, _ = cache_balance_cookie:get(cache_key_list); - if not data_str then - local res, _ = cache_balance_cookie:set(cache_key_list, cjson.encode(constant_balance.cookie.list)) - if not res then - tlog:err("sync_constant_fields_balance_cookie new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_balance_cookie:get(cache_key_list); - - tlog:dbg("sync_constant_fields_balance_cookie new store data, res=",res) - end - - local data_rule_str, _ = cache_balance_cookie:get(cache_key_rule); - if not data_rule_str then - local res, _ = cache_balance_cookie:set(cache_key_rule, constant_balance.cookie.rule) - if not res then - tlog:err("sync_constant_fields_balance_cookie new store rule err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_api new store rule, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_balance_cookie err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_cookie start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo.point) do - -- data fileds check - for i = 1, #data.point do - -- add keys - if data.point[i][key] == nil then - data.point[i][key] = demo.point[key] - table.insert(add_keys , { - key = data.point[i][key] - }) - end - end - end - - -- demo fileds check - for key , _ in pairs(demo.random) do - -- data fileds check - for i = 1, #data.random do - -- add keys - if data.random[i][key] == nil then - data.random[i][key] = demo.random[key] - table.insert(add_keys , { - key = data.random[i][key] - }) - end - end - end - - local res = cache_balance_cookie:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_balance_cookie err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_cookie done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- header策略数据同步 -local sync_constant_fields_balance_header = function () - local cache_key_list = constant_balance_header.cache_key.list - local cache_key_rule = constant_balance_header.cache_key.rule - - local demo = constant_balance_header.demo - - local data_str, _ = cache_balance_header:get(cache_key_list); - if not data_str then - local res, _ = cache_balance_header:set(cache_key_list, cjson.encode(constant_balance.header.list)) - if not res then - tlog:err("sync_constant_fields_balance_header new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_balance_header:get(cache_key_list); - - tlog:dbg("sync_constant_fields_balance_header new store data, res=",res) - end - - local data_rule_str, _ = cache_balance_header:get(cache_key_rule); - if not data_rule_str then - local res, _ = cache_balance_header:set(cache_key_rule, constant_balance.header.rule) - if not res then - tlog:err("sync_constant_fields_balance_header new store rule err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_header new store rule, res=",res) - end - - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_balance_header err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_header start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo.point) do - -- data fileds check - for i = 1, #data.point do - -- add keys - if data.point[i][key] == nil then - data.point[i][key] = demo.point[key] - table.insert(add_keys , { - key = data.point[i][key] - }) - end - end - end - - -- demo fileds check - for key , _ in pairs(demo.random) do - -- data fileds check - for i = 1, #data.random do - -- add keys - if data.random[i][key] == nil then - data.random[i][key] = demo.random[key] - table.insert(add_keys , { - key = data.random[i][key] - }) - end - end - end - - local res = cache_balance_header:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_balance_header err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_header done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- param策略数据同步 -local sync_constant_fields_balance_param = function () - local cache_key_list = constant_balance_param.cache_key.list - local cache_key_rule = constant_balance_param.cache_key.rule - - local demo = constant_balance_param.demo - - local data_str, _ = cache_balance_param:get(cache_key_list); - if not data_str then - local res, _ = cache_balance_param:set(cache_key_list, cjson.encode(constant_balance.param.list)) - if not res then - tlog:err("sync_constant_fields_balance_param new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_balance_param:get(cache_key_list); - - tlog:dbg("sync_constant_fields_balance_param new store data, res=",res) - end - - local data_rule_str, _ = cache_balance_param:get(cache_key_rule); - if not data_rule_str then - local res, _ = cache_balance_param:set(cache_key_rule, constant_balance.param.rule) - if not res then - tlog:err("sync_constant_fields_balance_param new store rule err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_param new store rule, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_balance_param err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_param start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo.point) do - -- data fileds check - for i = 1, #data.point do - -- add keys - if data.point[i][key] == nil then - data.point[i][key] = demo.point[key] - table.insert(add_keys ,{ - key = data.point[i][key] - }) - end - end - end - - -- demo fileds check - for key , _ in pairs(demo.random) do - -- data fileds check - for i = 1, #data.random do - -- add keys - if data.random[i][key] == nil then - data.random[i][key] = demo.random[key] - table.insert(add_keys , { - key = data.random[i][key] - }) - end - end - end - - local res = cache_balance_param:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_balance_param err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_balance_param done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - ---+++++++++++++++WAF数据同步+++++++++++++++-- - --- waf配置数据同步 -local sync_constant_fields_waf = function () - tlog:dbg("xxxx : ",constant_waf) - local cache_key = constant_waf.cache_key.options - local demo = constant_waf.demo - - local data_str, _ = cache_waf:get(cache_key); - if not data_str then - local res, _ = cache_waf:set(cache_key, cjson.encode(constant_waf.options)) - if not res then - tlog:err("sync_constant_fields_waf new store err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_waf:get(cache_key); - - tlog:dbg("sync_constant_fields_waf new store, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_waf err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - -- add keys - if data[key] == nil then - data[key] = demo[key] - table.insert(add_keys , key) - end - end - - local res = cache_waf:set(cache_key, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_waf err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- api waf规则数据同步 -local sync_constant_fields_waf_api = function () - local cache_key_list = constant_waf_api.cache_key.list; - local cache_key_scope = constant_waf_api.cache_key.scope - local cache_key_open = constant_waf_api.cache_key.open - - local demo = constant_waf_api.demo - - local data_str, _ = cache_waf_api:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_api:set(cache_key_list, cjson.encode(constant_waf.api.list)) - if not res then - tlog:err("sync_constant_fields_waf_api new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_waf_api:get(cache_key_list) - - tlog:dbg("sync_constant_fields_waf_api new store data, res=",res) - end - - local data_scope, _ = cache_waf_api:get(cache_key_scope); - if not data_scope then - local res, _ = cache_waf_api:set(cache_key_scope, constant_waf.api.scope) - if not res then - tlog:err("sync_constant_fields_waf_api new store scope err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_api new store scope, res=",res) - end - - local data_open, _ = cache_waf_api:get(cache_key_open); - if not data_open then - local res, _ = cache_waf_api:set(cache_key_open, constant_waf.api.open) - if not res then - tlog:err("sync_constant_fields_waf_api new store open err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_api new store open, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_waf_api err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_api start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , { - key = data[i][key] - }) - end - end - end - - local res = cache_waf_api:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_waf_api err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_api done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- ip waf规则数据同步 -local sync_constant_fields_waf_ip = function () - local cache_key_list = constant_waf_ip.cache_key.list; - local cache_key_scope = constant_waf_ip.cache_key.scope - local cache_key_open = constant_waf_ip.cache_key.open - - local demo = constant_waf_ip.demo - - local data_str, _ = cache_waf_ip:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_ip:set(cache_key_list, cjson.encode(constant_waf.ip.list)) - if not res then - tlog:err("sync_constant_fields_waf_ip new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_waf_ip:get(cache_key_list) - - tlog:dbg("sync_constant_fields_waf_ip new store data, res=",res) - end - - local data_scope, _ = cache_waf_ip:get(cache_key_scope); - if not data_scope then - local res, _ = cache_waf_ip:set(cache_key_scope, constant_waf.ip.scope) - if not res then - tlog:err("sync_constant_fields_waf_ip new store scope err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_ip new store scope, res=",res) - end - - local data_open, _ = cache_waf_ip:get(cache_key_open); - if not data_open then - local res, _ = cache_waf_ip:set(cache_key_open, constant_waf.ip.open) - if not res then - tlog:err("sync_constant_fields_waf_ip new store open err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_ip new store open, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_waf_ip err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_ip start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , { - key = data[i][key] - }) - end - end - end - - local res = cache_waf_ip:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_waf_ip err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_ip done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- cookie waf规则数据同步 -local sync_constant_fields_waf_cookie = function () - local cache_key_list = constant_waf_cookie.cache_key.list; - local cache_key_scope = constant_waf_cookie.cache_key.scope - local cache_key_open = constant_waf_cookie.cache_key.open - - local demo = constant_waf_cookie.demo - - local data_str, _ = cache_waf_cookie:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_cookie:set(cache_key_list, cjson.encode(constant_waf.cookie.list)) - if not res then - tlog:err("sync_constant_fields_waf_cookie new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_waf_cookie:get(cache_key_list) - - tlog:dbg("sync_constant_fields_waf_cookie new store data, res=",res) - end - - local data_scope, _ = cache_waf_cookie:get(cache_key_scope); - if not data_scope then - local res, _ = cache_waf_cookie:set(cache_key_scope, constant_waf.cookie.scope) - if not res then - tlog:err("sync_constant_fields_waf_cookie new store scope err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_cookie new store scope, res=",res) - end - - local data_open, _ = cache_waf_cookie:get(cache_key_open); - if not data_open then - local res, _ = cache_waf_cookie:set(cache_key_open, constant_waf.cookie.open) - if not res then - tlog:err("sync_constant_fields_waf_cookie new store open err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_cookie new store open, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_waf_cookie err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_cookie start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , { - key = data[i][key] - }) - end - end - end - - local res = cache_waf_cookie:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_waf_cookie err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_cookie done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- header waf规则数据同步 -local sync_constant_fields_waf_header = function () - local cache_key_list = constant_waf_header.cache_key.list; - local cache_key_scope = constant_waf_header.cache_key.scope - local cache_key_open = constant_waf_header.cache_key.open - - local demo = constant_waf_header.demo - - local data_str, _ = cache_waf_header:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_header:set(cache_key_list, cjson.encode(constant_waf.header.list)) - if not res then - tlog:err("sync_constant_fields_waf_header new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_waf_header:get(cache_key_list) - - tlog:dbg("sync_constant_fields_waf_header new store data, res=",res) - end - - local data_scope, _ = cache_waf_header:get(cache_key_scope); - if not data_scope then - local res, _ = cache_waf_header:set(cache_key_scope, constant_waf.header.scope) - if not res then - tlog:err("sync_constant_fields_waf_header new store scope err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_header new store scope, res=",res) - end - - local data_open, _ = cache_waf_header:get(cache_key_open); - if not data_open then - local res, _ = cache_waf_header:set(cache_key_open, constant_waf.header.open) - if not res then - tlog:err("sync_constant_fields_waf_header new store open err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_header new store open, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_waf_header err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_header start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , { - key = data[i][key] - }) - end - end - end - - local res = cache_waf_header:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_waf_header err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_header done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- param waf规则数据同步 -local sync_constant_fields_waf_param = function () - local cache_key_list = constant_waf_param.cache_key.list; - local cache_key_scope = constant_waf_param.cache_key.scope - local cache_key_open = constant_waf_param.cache_key.open - - local demo = constant_waf_param.demo - - local data_str, _ = cache_waf_param:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_param:set(cache_key_list, cjson.encode(constant_waf.param.list)) - if not res then - tlog:err("sync_constant_fields_waf_param new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_waf_param:get(cache_key_list) - - tlog:dbg("sync_constant_fields_waf_param new store data, res=",res) - end - - local data_scope, _ = cache_waf_param:get(cache_key_scope); - if not data_scope then - local res, _ = cache_waf_param:set(cache_key_scope, constant_waf.param.scope) - if not res then - tlog:err("sync_constant_fields_waf_param new store scope err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_param new store scope, res=",res) - end - - local data_open, _ = cache_waf_param:get(cache_key_open); - if not data_open then - local res, _ = cache_waf_param:set(cache_key_open, constant_waf.param.open) - if not res then - tlog:err("sync_constant_fields_waf_param new store open err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_param new store open, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_waf_param err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_param start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , { - key = data[i][key] - }) - end - end - end - - local res = cache_waf_param:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_waf_param err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_param done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - --- cc waf规则数据同步 -local sync_constant_fields_waf_cc = function () - local cache_key_list = constant_waf_cc.cache_key.list; - local cache_key_scope = constant_waf_cc.cache_key.scope - local cache_key_open = constant_waf_cc.cache_key.open - - local demo = constant_waf_cc.demo - - local data_str, _ = cache_waf_cc:get(cache_key_list); - if not data_str then - local res, _ = cache_waf_cc:set(cache_key_list, cjson.encode(constant_waf.cc.list)) - if not res then - tlog:err("sync_constant_fields_waf_cc new store data err, res=",res) - return tl_ops_rt.error - end - - data_str, _ = cache_waf_cc:get(cache_key_list) - - tlog:dbg("sync_constant_fields_waf_cc new store data, res=",res) - end - - local data_scope, _ = cache_waf_cc:get(cache_key_scope); - if not data_scope then - local res, _ = cache_waf_cc:set(cache_key_scope, constant_waf.cc.scope) - if not res then - tlog:err("sync_constant_fields_waf_cc new store scope err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_cc new store scope, res=",res) - end - - local data_open, _ = cache_waf_cc:get(cache_key_open); - if not data_open then - local res, _ = cache_waf_cc:set(cache_key_open, constant_waf.cc.open) - if not res then - tlog:err("sync_constant_fields_waf_cc new store open err, res=",res) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_cc new store open, res=",res) - end - - local data = cjson.decode(data_str); - if not data and type(data) ~= 'table' then - tlog:err("sync_constant_fields_waf_cc err, old=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_cc start, old=",data) - - local add_keys = {} - - -- demo fileds check - for key , _ in pairs(demo) do - -- data fileds check - for i = 1, #data do - -- add keys - if data[i][key] == nil then - data[i][key] = demo[key] - table.insert(add_keys , { - key = data[i][key] - }) - end - end - end - - local res = cache_waf_cc:set(cache_key_list, cjson.encode(data)) - if not res then - tlog:err("sync_constant_fields_waf_cc err, res=",res,",new=",data) - return tl_ops_rt.error - end - - tlog:dbg("sync_constant_fields_waf_cc done, new=",data,",add_keys=",add_keys) - - return tl_ops_rt.ok -end - - - -function _M:sync_constant_fields_module( module ) - - if module == 'service' then - return sync_constant_fields_service() - end - - if module == 'health' then - return sync_constant_fields_health() - end - - if module == 'limit-fuse' then - return sync_constant_fields_limit() - end - - if module == 'limit-token' then - return sync_constant_fields_limit_token() - end - - if module == 'limit-leak' then - return sync_constant_fields_limit_leak() - end - - if module == 'balance' then - return sync_constant_fields_balance() - end - - if module == 'balance-api' then - return sync_constant_fields_balance_api() - end - - if module == 'balance-cookie' then - return sync_constant_fields_balance_cookie() - end - - if module == 'balance-header' then - return sync_constant_fields_balance_header() - end - - if module == 'balance-param' then - return sync_constant_fields_balance_param() - end - - if module == 'waf' then - return sync_constant_fields_waf() - end - - if module == 'waf-api' then - return sync_constant_fields_waf_api() - end - - if module == 'waf-ip' then - return sync_constant_fields_waf_ip() - end - - if module == 'waf-header' then - return sync_constant_fields_waf_header() - end - - if module == 'waf-cookie' then - return sync_constant_fields_waf_cookie() - end - - if module == 'waf-param' then - return sync_constant_fields_waf_param() - end - - if module == 'waf-cc' then - return sync_constant_fields_waf_cc() - end -end - - -return _M \ No newline at end of file diff --git a/plugins/tl_ops_sync/tl_ops_constant_sync.lua b/plugins/tl_ops_sync/tl_ops_plugin_constant.lua similarity index 100% rename from plugins/tl_ops_sync/tl_ops_constant_sync.lua rename to plugins/tl_ops_sync/tl_ops_plugin_constant.lua diff --git a/plugins/tl_ops_sync_cluster/sync_cluster.lua b/plugins/tl_ops_sync_cluster/sync_cluster.lua new file mode 100644 index 0000000..bcfe4ca --- /dev/null +++ b/plugins/tl_ops_sync_cluster/sync_cluster.lua @@ -0,0 +1,59 @@ +-- tl_ops_sync_cluster +-- en : sync_cluster data , load data to memory +-- zn : 同步数据接口,预热数据 +-- @author iamtsm +-- @email 1905333456@qq.com + +local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_plugin_sync_cluster") +local sync_cluster_data = require("plugins.tl_ops_sync_cluster.sync_cluster_data") +local sync_env = tlops.env.sync +local utils = tlops.utils + + +local _M = { + _VERSION = '0.01', +} +local mt = { __index = _M } + + +-- 核心逻辑 +local tl_ops_sync_cluster_timer = function(premature, args) + if premature then + return + end + + local sync_cluster_data_env = sync_env.cluster_data + if sync_cluster_data_env.open then + local module = sync_cluster_data_env.module + if module then + for i = 1, #module do + + end + else + tlog:dbg("sync_cluster_data no module, module=",module) + end + end +end + + +-- 启动器 +function _M:tl_ops_sync_cluster_timer_start( ) + local lock_key = "tl_ops_plugin_sync_cluster_lock" + local lock_time = 5 + if not utils:tl_ops_worker_lock(lock_key, lock_time) then + return + end + + local ok, _ = ngx.timer.at(0, tl_ops_sync_cluster_timer, nil) + if not ok then + tlog:err("tl_ops_sync_cluster_timer start failed to run , create timer failed " ,_) + return nil + end +end + + +function _M:new() + return setmetatable({}, mt) +end + +return _M diff --git a/plugins/tl_ops_sync_cluster/sync_cluster_data.lua b/plugins/tl_ops_sync_cluster/sync_cluster_data.lua new file mode 100644 index 0000000..e69de29 diff --git a/plugins/tl_ops_sync_cluster/sync_cluster_timer.lua b/plugins/tl_ops_sync_cluster/sync_cluster_timer.lua new file mode 100644 index 0000000..ba0a9eb --- /dev/null +++ b/plugins/tl_ops_sync_cluster/sync_cluster_timer.lua @@ -0,0 +1,78 @@ +-- tl_ops_sync_cluster_timer +-- en : sync_cluster timer +-- zn : 周期性同步集群节点数据 +-- @author iamtsm +-- @email 1905333456@qq.com + +local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_plugin_sync_cluster_timer") +local sync_cluster_data = require("plugins.tl_ops_sync_cluster.sync_cluster_data") +local constant_sync_cluster = require("plugins.tl_ops_sync_cluster.tl_ops_plugin_constant") +local sync_env = tlops.env.sync +local utils = tlops.utils + + +local _M = { + _VERSION = '0.01', +} +local mt = { __index = _M } + + +-- 核心逻辑 +local tl_ops_sync_cluster_core = function(options, module) + + tlog:dbg("tl_ops_sync_cluster_core checking") + + local sync_cluster_data_env = sync_env.cluster_data + if sync_cluster_data_env.open then + local module = sync_cluster_data_env.module + if module then + -- 对每个节点执行周期性的检查,检查各个模块数据 + for i = 1, #module do + + + end + else + tlog:dbg("tl_ops_sync_cluster_timer no module, module=",module) + end + end + +end + + + +-- timer +local tl_ops_sync_cluster_timer = function(premature, options) + if premature then + return + end + + tlog:dbg("tl_ops_sync_cluster_timer options=",options) + + tl_ops_sync_cluster_core(options, module) +end + + +-- 启动器 +function _M:tl_ops_sync_cluster_timer_start( ) + local lock_key = "tl_ops_plugin_sync_cluster_timer_lock" + local lock_time = 1 + if not utils:tl_ops_worker_lock(lock_key, lock_time) then + return + end + + local interval = constant_sync_cluster.interval + local options = constant_sync_cluster.options + + local ok, _ = ngx.timer.at(interval, tl_ops_sync_cluster_timer, options) + if not ok then + tlog:err("tl_ops_sync_cluster_timer start failed to run , create timer failed " ,_) + return nil + end +end + + +function _M:new() + return setmetatable({}, mt) +end + +return _M diff --git a/plugins/tl_ops_sync_cluster/tl_ops_plugin_constant.lua b/plugins/tl_ops_sync_cluster/tl_ops_plugin_constant.lua new file mode 100644 index 0000000..e45e422 --- /dev/null +++ b/plugins/tl_ops_sync_cluster/tl_ops_plugin_constant.lua @@ -0,0 +1,28 @@ +-- 集群节点数据同步配置 + +local tl_ops_constant_sync_cluster = { + cache_key = { + options = "tl_ops_sync_cluster_options", -- 主从节点列表 (暂不支持动态配置,只能在文件配置) + interval = "tl_ops_sync_cluster_interval" -- 主从心跳周期/单位/s (暂不支持动态配置,只能在文件配置) + }, + options = { + { + id = 1, + ip = "127.0.0.1", + port = 80, + }, + { + id = 2, + ip = "127.0.0.1", + port = 81, + } + }, + demo = { + id = 1, -- 主节点默认放第一个,且只能有一个主节点,否则插件不执行 + ip = "127.0.0.1", -- 节点ip + port = 80, -- 节点端口 + }, + interval = 5 +} + +return tl_ops_constant_sync_cluster \ No newline at end of file diff --git a/plugins/tl_ops_sync_cluster/tl_ops_plugin_core.lua b/plugins/tl_ops_sync_cluster/tl_ops_plugin_core.lua new file mode 100644 index 0000000..d754af6 --- /dev/null +++ b/plugins/tl_ops_sync_cluster/tl_ops_plugin_core.lua @@ -0,0 +1,36 @@ +-- tl_ops_plugin_sync_cluster +-- en : sync_cluster +-- zn : 集群同步器插件 +-- @author iamtsm +-- @email 1905333456@qq.com + +local plugin_sync_cluster = require("plugins.tl_ops_sync_cluster.sync_cluster"); +local plugin_sync_cluster_timer = require("plugins.tl_ops_sync_cluster.sync_cluster_timer"); + +local _M = { + _VERSION = '0.01' +} + +local mt = { __index = _M } + + +function _M:new(options) + if not options then + options = {} + end + return setmetatable(options, mt) +end + + +function _M:tl_ops_process_after_init_worker() + + -- 启动同步器 + plugin_sync_cluster:tl_ops_sync_cluster_timer_start() + + -- 启动心跳检查同步 + plugin_sync_cluster_timer:tl_ops_sync_cluster_timer_start() + + return true, "ok" +end + +return _M diff --git a/plugins/tl_ops_template/tl_ops_plugin_constant.lua b/plugins/tl_ops_template/tl_ops_plugin_constant.lua new file mode 100644 index 0000000..a210b28 --- /dev/null +++ b/plugins/tl_ops_template/tl_ops_plugin_constant.lua @@ -0,0 +1,3 @@ +return { + +} \ No newline at end of file