Skip to content

Commit

Permalink
feat: 支持滑动窗口算法
Browse files Browse the repository at this point in the history
feat: 优化健康检查逻辑
feat: 控制台样式优化
fix : 修复限流block不生效
fix : 补充文档
feat: 事务日程调整
  • Loading branch information
iamtsm committed Dec 22, 2022
1 parent 21d46ba commit 402145a
Show file tree
Hide file tree
Showing 14 changed files with 460 additions and 60 deletions.
11 changes: 11 additions & 0 deletions api/router/tl_ops_api_get_state.lua
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ local Router = function()
local limit_rate
local limit_pre_time
local limit_bucket
local limit_block
if limit_depend == tl_ops_constant_limit.depend.token then
limit_capacity = shared:get(tl_ops_utils_func:gen_node_key(tl_ops_constant_limit.token.cache_key.capacity, node.service, node_id))
if not limit_capacity then
Expand All @@ -128,6 +129,10 @@ local Router = function()
if not limit_rate then
limit_rate = 'nil'
end
limit_block = shared:get(tl_ops_utils_func:gen_node_key(tl_ops_constant_limit.token.cache_key.block, node.service, node_id))
if not limit_block then
limit_block = 'nil'
end
limit_pre_time = shared:get(tl_ops_utils_func:gen_node_key(tl_ops_constant_limit.token.cache_key.pre_time, node.service, node_id))
if not limit_pre_time then
limit_pre_time = 'nil'
Expand All @@ -137,6 +142,7 @@ local Router = function()
limit_bucket = 'nil'
end
end

if limit_depend == tl_ops_constant_limit.depend.leak then
limit_capacity = shared:get(tl_ops_utils_func:gen_node_key(tl_ops_constant_limit.leak.cache_key.capacity, node.service, node_id))
if not limit_capacity then
Expand All @@ -146,6 +152,10 @@ local Router = function()
if not limit_rate then
limit_rate = 'nil'
end
limit_block = shared:get(tl_ops_utils_func:gen_node_key(tl_ops_constant_limit.leak.cache_key.block, node.service, node_id))
if not limit_block then
limit_block = 'nil'
end
limit_pre_time = shared:get(tl_ops_utils_func:gen_node_key(tl_ops_constant_limit.leak.cache_key.pre_time, node.service, node_id))
if not limit_pre_time then
limit_pre_time = 'nil'
Expand Down Expand Up @@ -173,6 +183,7 @@ local Router = function()
limit_depend = limit_depend,
limit_capacity = limit_capacity,
limit_rate = limit_rate,
limit_block = limit_block,
limit_pre_time = limit_pre_time,
limit_bucket = limit_bucket,
balance_success = cjson.decode(balance_success_cache),
Expand Down
36 changes: 35 additions & 1 deletion constant/tl_ops_constant_limit.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local token = { --服务令牌桶配置
cache_key = {
options_list = "tl_ops_limit_token_options_list",
capacity = "tl_ops_limit_token_capacity_service",
block = "tl_ops_limit_token_block_service",
rate = "tl_ops_limit_token_rate_service",
expand = "tl_ops_limit_leak_expand_service",
shrink = "tl_ops_limit_leak_shrink_service",
Expand Down Expand Up @@ -33,6 +34,7 @@ local leak = { --服务漏桶配置
cache_key = {
options_list = "tl_ops_limit_leak_options_list",
capacity = "tl_ops_limit_leak_capacity_service",
block = "tl_ops_limit_leak_block_service",
rate = "tl_ops_limit_leak_rate_service",
expand = "tl_ops_limit_leak_expand_service",
shrink = "tl_ops_limit_leak_shrink_service",
Expand All @@ -53,10 +55,41 @@ local leak = { --服务漏桶配置
}
}

-- 滑动窗口配置
local sliding = { --服务滑动窗口配置
cache_key = {
options_list = "tl_ops_limit_sliding_options_list",
window = "tl_ops_limit_sliding_window_service",
block = "tl_ops_limit_sliding_block_service",
cycle = "tl_ops_limit_sliding_cycle_service",
limit = "tl_ops_limit_sliding_limit_service",
rate = "tl_ops_limit_sliding_rate_service",
expand = "tl_ops_limit_sliding_expand_service",
shrink = "tl_ops_limit_sliding_shrink_service",
count = "tl_ops_limit_sliding_count_service",
pre_time = "tl_ops_limit_sliding_pre_time_service",
lock = "tl_ops_limit_sliding_lock_service"
},
options = {

},
demo = {
service_name = "tlops-demo", -- 滑动窗口配置所属服务
window = 60, -- 滑动窗口大小,理解为时间窗口(单位秒,默认一个窗口60s)
limit = 1024 * 100, -- 滑动窗口限制请求的数量 (默认时间窗口60s,限制请求100 * 单位个)
cycle = 10, -- 区间个数 (默认时间窗口60s,切分为10个,每个区间6s)
rate = 1, -- 窗口一次滑动多少个区间
block = 1024, -- 请求单位 (按字节为单位,可做字节整型流控)
expand = 0.5, -- 扩容比例
shrink = 0.5, -- 缩容比例
}
}

-- 依赖限流组件
local depend = {
token = "token",
leak = "leak"
leak = "leak",
sliding = "sliding"
}

-- 组件级别
Expand Down Expand Up @@ -104,6 +137,7 @@ local tl_ops_constant_limit = {
fuse = fuse,
token = token,
leak = leak,
sliding = sliding,
depend = depend,
level = level,
mode = mode
Expand Down
13 changes: 13 additions & 0 deletions doc/change.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@

## 事务更新日程

2022-12-21

1. 支持滑动窗口算法

2. 优化健康检查逻辑

3. 修复限流block不生效

4. 补充文档

5. 事务日程调整


2022-09-15

1. 支持登录插件配置实时修改
Expand Down
11 changes: 11 additions & 0 deletions gitbook/usage/intro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,14 @@ tl-ops-manage (tl-openresty-web-manage),基于openresty开发的一款基

做这个项目最开始的想法很简单,只是想在造轮子的过程中了解,学习一些新的知识面。到后面完成了大部分基础功能点后,发现可以整理为一个服务管理工具的项目。于是就有了tl-ops-manage。


## 优势

### 代码简洁,通俗易懂,没有特殊复杂逻辑交互,十分清晰明了的执行流程,和详细的模块说明文档。

### 项目许多模块和算法都是可独立出来作基础组件。

### 配置环境,操作界面,容易上手,无需依赖过多第三方环境。

### 容易扩展,适合个人,中小企业贴合业务扩展具体场景逻辑。

37 changes: 19 additions & 18 deletions health/tl_ops_health_check.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@
-- @author iamtsm
-- @email [email protected]

local cjson = require("cjson.safe")
local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_health")
local tl_ops_utils_func = require("utils.tl_ops_utils_func")
local tl_ops_constant_health = require("constant.tl_ops_constant_health")
local tl_ops_status = require("constant.tl_ops_constant_comm").tl_ops_status;
local tl_ops_health_check_dynamic_conf = require("health.tl_ops_health_check_dynamic_conf")
local tl_ops_health_check_version = require("health.tl_ops_health_check_version")
local nx_socket = ngx.socket.tcp
local shared = ngx.shared.tlopsbalance
local find = ngx.re.find


local cjson = require("cjson.safe")
local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_health")
local tl_ops_utils_func = require("utils.tl_ops_utils_func")
local tl_ops_constant_health = require("constant.tl_ops_constant_health")
local tl_ops_status = require("constant.tl_ops_constant_comm").tl_ops_status;
local tl_ops_health_check_dynamic_conf = require("health.tl_ops_health_check_dynamic_conf")
local tl_ops_health_check_version = require("health.tl_ops_health_check_version")
local nx_socket = ngx.socket.tcp
local shared = ngx.shared.tlopsbalance
local find = ngx.re.find

local _M = {
_VERSION = '0.02'
Expand Down Expand Up @@ -238,26 +236,29 @@ tl_ops_health_check_nodes = function (conf)
local bytes, _ = sock:send(check_content .. "\r\n\r\n\r\n")
if not bytes then
tlog:err("tl_ops_health_check_nodes failed to send socket: ", _)
socket:close()
tl_ops_health_check_node_failed(conf, node_id, node)
break
end

tlog:dbg("tl_ops_health_check_nodes send socket ok : byte=", bytes)

-- socket反馈
local receive_line, _ = sock:receive()
if not receive_line then
local receive_10k, _ = sock:receiveany(10240)
if not receive_10k then
if _ == "check_timeout" then
tlog:err("tl_ops_health_check_nodes socket check_timeout: ", _)
sock:close()
end

tlog:err("tl_ops_health_check_nodes socket receive failed: ", receive_10k)
sock:close()
tl_ops_health_check_node_failed(conf, node_id, node)
break
end

tlog:dbg("tl_ops_health_check_nodes receive socket ok : ", receive_line)
tlog:dbg("tl_ops_health_check_nodes receive socket ok : ", receive_10k)

local from, to, _ = find(receive_line, [[^HTTP/\d+\.\d+\s+(\d+)]], "joi", nil, 1)
local from, to, _ = find(receive_10k, [[^HTTP/\d+\.\d+\s+(\d+)]], "joi", nil, 1)
if not from then
tlog:err("tl_ops_health_check_nodes ngx.re.find receive err: ", from, to, _)
sock:close()
Expand All @@ -266,7 +267,7 @@ tl_ops_health_check_nodes = function (conf)
end

-- 心跳状态
local status = tonumber(string.sub(receive_line, from, to))
local status = tonumber(string.sub(receive_10k, from, to))

tlog:dbg("tl_ops_health_check_nodes get status ok ,name=" ,name, ", status=" , status)
local statusPass = false;
Expand Down
14 changes: 7 additions & 7 deletions health/tl_ops_health_check_dynamic_conf.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
-- @email [email protected]


local cjson = require("cjson.safe");
local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_health_check_dynamic_conf");
local tl_ops_utils_func = require("utils.tl_ops_utils_func");
local tl_ops_constant_health = require("constant.tl_ops_constant_health")
local cache_health = require("cache.tl_ops_cache_core"):new("tl-ops-health");
local tl_ops_constant_service = require("constant.tl_ops_constant_service");
local shared = ngx.shared.tlopsbalance
local cjson = require("cjson.safe");
local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_health_check_dynamic_conf");
local tl_ops_utils_func = require("utils.tl_ops_utils_func");
local tl_ops_constant_health = require("constant.tl_ops_constant_health")
local cache_health = require("cache.tl_ops_cache_core"):new("tl-ops-health");
local tl_ops_constant_service = require("constant.tl_ops_constant_service");
local shared = ngx.shared.tlopsbalance

-- 需要提前定义,定时器访问不了
local tl_ops_health_check_dynamic_conf_add_timer_check;
Expand Down
20 changes: 10 additions & 10 deletions limit/fuse/tl_ops_limit_fuse_check.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
-- @author iamtsm
-- @email [email protected]

local tl_ops_utils_func = require("utils.tl_ops_utils_func");
local tl_ops_utils_func = require("utils.tl_ops_utils_func");
local tl_ops_limit_fuse_check_dynamic_conf = require("limit.fuse.tl_ops_limit_fuse_check_dynamic_conf")
local tl_ops_limit_fuse_check_version = require("limit.fuse.tl_ops_limit_fuse_check_version")
local tl_ops_limit_token_bucket = require("limit.fuse.tl_ops_limit_fuse_token_bucket");
local tl_ops_limit_leak_bucket = require("limit.fuse.tl_ops_limit_fuse_leak_bucket");
local tl_ops_constant_limit = require("constant.tl_ops_constant_limit")
local tl_ops_constant_health = require("constant.tl_ops_constant_health")
local tl_ops_constant_service = require("constant.tl_ops_constant_service")
local shared = ngx.shared.tlopsbalance
local cjson = require("cjson.safe");
local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_limit_fuse");
local tl_ops_limit_fuse_check_version = require("limit.fuse.tl_ops_limit_fuse_check_version")
local tl_ops_limit_token_bucket = require("limit.fuse.tl_ops_limit_fuse_token_bucket");
local tl_ops_limit_leak_bucket = require("limit.fuse.tl_ops_limit_fuse_leak_bucket");
local tl_ops_constant_limit = require("constant.tl_ops_constant_limit")
local tl_ops_constant_health = require("constant.tl_ops_constant_health")
local tl_ops_constant_service = require("constant.tl_ops_constant_service")
local shared = ngx.shared.tlopsbalance
local cjson = require("cjson.safe");
local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_limit_fuse");


local _STATE = {
Expand Down
12 changes: 6 additions & 6 deletions limit/fuse/tl_ops_limit_fuse_check_dynamic_conf.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
-- @author iamtsm
-- @email [email protected]

local cjson = require("cjson.safe");
local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_limit_fuse_check_dynamic_conf");
local tl_ops_utils_func = require("utils.tl_ops_utils_func");
local tl_ops_constant_limit = require("constant.tl_ops_constant_limit")
local cache_limit = require("cache.tl_ops_cache_core"):new("tl-ops-limit");
local cjson = require("cjson.safe");
local tlog = require("utils.tl_ops_utils_log"):new("tl_ops_limit_fuse_check_dynamic_conf");
local tl_ops_utils_func = require("utils.tl_ops_utils_func");
local tl_ops_constant_limit = require("constant.tl_ops_constant_limit")
local cache_limit = require("cache.tl_ops_cache_core"):new("tl-ops-limit");
local tl_ops_constant_service = require("constant.tl_ops_constant_service")
local shared = ngx.shared.tlopsbalance
local shared = ngx.shared.tlopsbalance

local tl_ops_limit_fuse_check_dynamic_conf_add_timer_check;

Expand Down
20 changes: 18 additions & 2 deletions limit/fuse/tl_ops_limit_fuse_leak_bucket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,15 @@ end
local tl_ops_limit_leak = function( service_name, node_id )
local leak_mode = tl_ops_limit_leak_mode( service_name , node_id)

local block = leak_mode.options.block
local block_key = tl_ops_utils_func:gen_node_key(leak_mode.cache_key.block, service_name, node_id)
local block = shared:get(block_key)
if not block then
local res, _ = shared:set(block, leak_mode.options.block)
if not res then
return false
end
block = leak_mode.options.block
end

local capacity_key = tl_ops_utils_func:gen_node_key(leak_mode.cache_key.capacity, service_name, node_id)
local capacity = shared:get(capacity_key)
Expand Down Expand Up @@ -176,7 +184,15 @@ local tl_ops_limit_leak_shrink = function( service_name, node_id )

local leak_mode = tl_ops_limit_leak_mode( service_name, node_id)

local block = leak_mode.options.block
local block_key = tl_ops_utils_func:gen_node_key(leak_mode.cache_key.block, service_name, node_id)
local block = shared:get(block_key)
if not block then
local res, _ = shared:set(block, leak_mode.options.block)
if not res then
return false
end
block = leak_mode.options.block
end

local capacity_key = tl_ops_utils_func:gen_node_key(leak_mode.cache_key.capacity, service_name, node_id)
local capacity = shared:get(capacity_key)
Expand Down
24 changes: 20 additions & 4 deletions limit/fuse/tl_ops_limit_fuse_token_bucket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ end
local tl_ops_limit_token = function( service_name, node_id )
local token_mode = tl_ops_limit_token_mode( service_name , node_id)

local block = token_mode.options.block

local capacity_key = tl_ops_utils_func:gen_node_key(token_mode.cache_key.capacity, service_name, node_id)
local capacity = shared:get(capacity_key)
if not capacity then
Expand Down Expand Up @@ -104,6 +102,16 @@ local tl_ops_limit_token = function( service_name, node_id )
token_bucket = 0
end

local block_key = tl_ops_utils_func:gen_node_key(token_mode.cache_key.block, service_name, node_id)
local block = shared:get(block_key)
if not block then
local res, _ = shared:set(block_key, token_mode.options.block)
if not res then
return false
end
block = token_mode.options.block
end

-- 取出令牌
if token_bucket > block then
local ok, _ = shared:incr(token_bucket_key, -block)
Expand Down Expand Up @@ -198,8 +206,6 @@ end
local tl_ops_limit_token_shrink = function( service_name, node_id )

local token_mode = tl_ops_limit_token_mode( service_name, node_id)

local block = token_mode.options.block

local capacity_key = tl_ops_utils_func:gen_node_key(token_mode.cache_key.capacity, service_name, node_id)
local capacity = shared:get(capacity_key)
Expand All @@ -211,6 +217,16 @@ local tl_ops_limit_token_shrink = function( service_name, node_id )
capacity = token_mode.options.capacity
end

local block_key = tl_ops_utils_func:gen_node_key(token_mode.cache_key.block, service_name, node_id)
local block = shared:get(block_key)
if not block then
local res, _ = shared:set(block_key, token_mode.options.block)
if not res then
return false
end
block = token_mode.options.block
end

-- 无需缩容
if capacity <= block then
return true
Expand Down
Loading

0 comments on commit 402145a

Please sign in to comment.