Skip to content

Commit

Permalink
Merge pull request #2764 from InfinityPacer/feature/event
Browse files Browse the repository at this point in the history
  • Loading branch information
jxxghp authored Sep 22, 2024
2 parents 8e8a10f + 5fc5838 commit d7c277a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
59 changes: 36 additions & 23 deletions app/core/event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import importlib
import inspect
import random
import threading
import time
import traceback
Expand All @@ -12,11 +13,13 @@
from app.helper.thread import ThreadHelper
from app.log import logger
from app.schemas.types import EventType, ChainEventType
from app.utils.limit import ExponentialBackoffRateLimiter
from app.utils.singleton import Singleton

DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级
MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数
EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 30 # 事件队列空闲时的超时时间(秒)
INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 1 # 事件队列空闲时的初始超时时间(秒)
MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的最大超时时间(秒)


class Event:
Expand Down Expand Up @@ -78,7 +81,6 @@ def __init__(self):
self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__disabled_classes = set() # 禁用的事件处理器类集合
self.__lock = threading.Lock() # 线程锁
self.__condition = threading.Condition(self.__lock) # 条件变量

def start(self):
"""
Expand All @@ -87,7 +89,7 @@ def start(self):
# 启动消费者线程用于处理广播事件
self.__event.set()
for _ in range(MIN_EVENT_CONSUMER_THREADS):
thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True)
thread = threading.Thread(target=self.__broadcast_consumer_loop, daemon=True)
thread.start()
self.__consumer_threads.append(thread) # 将线程对象保存到列表中

Expand All @@ -105,15 +107,24 @@ def stop(self):
except Exception as e:
logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}")

def check(self, etype: EventType):
def check(self, etype: Union[EventType, ChainEventType]) -> bool:
"""
检查事件是否存在响应,去除掉被禁用的事件响应
检查是否有启用的事件处理器可以响应某个事件类型
:param etype: 事件类型 (EventType 或 ChainEventType)
:return: 返回是否存在可用的处理器
"""
if etype not in self.__broadcast_subscribers:
return False
handlers = self.__broadcast_subscribers[etype]
return any([handler for handler in handlers.values()
if handler.__qualname__.split(".")[0] not in self.__disabled_handlers])
if isinstance(etype, ChainEventType):
handlers = self.__chain_subscribers.get(etype, [])
return any(
self.__is_handler_enabled(handler)
for _, handler in handlers
)
else:
handlers = self.__broadcast_subscribers.get(etype, [])
return any(
self.__is_handler_enabled(handler)
for handler in handlers
)

def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
Expand All @@ -127,8 +138,6 @@ def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dic
event = Event(etype, data, priority)
if isinstance(etype, EventType):
self.__trigger_broadcast_event(event)
with self.__condition:
self.__condition.notify()
elif isinstance(etype, ChainEventType):
return self.__trigger_chain_event(event)
else:
Expand Down Expand Up @@ -390,20 +399,24 @@ def __get_class_instance(class_name: str):

return class_obj

def __fixed_broadcast_consumer(self):
def __broadcast_consumer_loop(self):
"""
固定的后台广播消费者线程,持续从队列中提取事件
持续从队列中提取事件的后台广播消费者线程
"""
jitter_factor = 0.1
rate_limiter = ExponentialBackoffRateLimiter(base_wait=INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS,
max_wait=MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS,
backoff_factor=2.0,
source="BroadcastConsumer",
enable_logging=False)
while self.__event.is_set():
# 使用 Condition 优化队列的等待机制,避免频繁触发超时
with self.__condition:
# 阻塞等待,直到有事件插入
self.__condition.wait()
try:
priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS)
self.__dispatch_broadcast_event(event)
except Empty:
logger.debug("Queue is empty, waiting for new events")
try:
priority, event = self.__event_queue.get(timeout=rate_limiter.current_wait)
rate_limiter.reset()
self.__dispatch_broadcast_event(event)
except Empty:
rate_limiter.current_wait = rate_limiter.current_wait * random.uniform(1, 1 + jitter_factor)
rate_limiter.trigger_limit()

@staticmethod
def __log_event_lifecycle(event: Event, stage: str):
Expand Down
14 changes: 7 additions & 7 deletions app/utils/limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
每次触发限流时,等待时间会成倍增加,直到达到最大等待时间
"""

def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0,
def __init__(self, base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0,
source: str = "", enable_logging: bool = True):
"""
初始化 ExponentialBackoffRateLimiter 实例
Expand All @@ -108,7 +108,7 @@ def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: flo
:param enable_logging: 是否启用日志记录,默认为 True
"""
super().__init__(source, enable_logging)
self.next_allowed_time = 0
self.next_allowed_time = 0.0
self.current_wait = base_wait
self.base_wait = base_wait
self.max_wait = max_wait
Expand Down Expand Up @@ -144,7 +144,7 @@ def reset(self):
with self.lock:
if self.next_allowed_time != 0 or self.current_wait > self.base_wait:
self.log_info(f"调用成功,重置限流等待时间为 {self.base_wait} 秒")
self.next_allowed_time = 0
self.next_allowed_time = 0.0
self.current_wait = self.base_wait

def trigger_limit(self):
Expand All @@ -155,8 +155,8 @@ def trigger_limit(self):
current_time = time.time()
with self.lock:
self.next_allowed_time = current_time + self.current_wait
self.log_warning(f"触发限流,将在 {self.current_wait} 秒后允许继续调用")
self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait)
self.log_warning(f"触发限流,将在 {self.current_wait} 秒后允许继续调用")


# 时间窗口限流器
Expand All @@ -166,7 +166,7 @@ class WindowRateLimiter(BaseRateLimiter):
如果超过允许的最大调用次数,则限流直到窗口期结束
"""

def __init__(self, max_calls: int, window_seconds: int,
def __init__(self, max_calls: int, window_seconds: float,
source: str = "", enable_logging: bool = True):
"""
初始化 WindowRateLimiter 实例
Expand Down Expand Up @@ -308,7 +308,7 @@ def wrapper(*args, **kwargs) -> Optional[Any]:


# 装饰器:指数退避限流
def rate_limit_exponential(base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0,
def rate_limit_exponential(base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0,
raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable:
"""
装饰器,用于应用指数退避限流策略
Expand All @@ -329,7 +329,7 @@ def rate_limit_exponential(base_wait: int = 60, max_wait: int = 600, backoff_fac


# 装饰器:时间窗口限流
def rate_limit_window(max_calls: int, window_seconds: int,
def rate_limit_window(max_calls: int, window_seconds: float,
raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable:
"""
装饰器,用于应用时间窗口限流策略
Expand Down

0 comments on commit d7c277a

Please sign in to comment.