Skip to content

Commit

Permalink
5.5.4 name threads and add debug api.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-X-Net committed Aug 8, 2023
1 parent d382131 commit 31638bb
Show file tree
Hide file tree
Showing 32 changed files with 87 additions and 64 deletions.
2 changes: 1 addition & 1 deletion code/default/gae_proxy/local/appid_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def report_out_of_quota(self, appid):

def report_not_exist(self, appid, ip):
self.logger.debug("report_not_exist:%s %s", appid, ip)
th = threading.Thread(target=self.process_appid_not_exist, args=(appid, ip))
th = threading.Thread(target=self.process_appid_not_exist, args=(appid, ip), name="process_appid_not_exist")
th.start()

def process_appid_not_exist(self, appid, ip):
Expand Down
2 changes: 1 addition & 1 deletion code/default/gae_proxy/local/check_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def checker(self):

def run(self):
for i in range(0, 100):
threading.Thread(target=self.checker).start()
threading.Thread(target=self.checker, name="gae_ip_checker").start()


def check_all():
Expand Down
2 changes: 1 addition & 1 deletion code/default/gae_proxy/local/check_local_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def triger_check_network(self, fail=False, force=False):
return

self.last_check_time = time_now
threading.Thread(target=self._simple_check_worker).start()
threading.Thread(target=self._simple_check_worker, name="network_checker").start()


IPv4 = CheckNetwork("IPv4")
Expand Down
4 changes: 2 additions & 2 deletions code/default/gae_proxy/local/gae_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,10 +857,10 @@ def run(self):
(data_left_to_fetch + front.config.AUTORANGE_MAXSIZE - 1) / front.config.AUTORANGE_MAXSIZE)
thread_num = min(front.config.AUTORANGE_THREADS, fetch_times)
for i in range(0, thread_num):
threading.Thread(target=self.fetch_worker).start()
threading.Thread(target=self.fetch_worker, name="gae_fetch_work").start()

threading.Thread(target=self.fetch, args=(
res_begin, res_end, self.response)).start()
res_begin, res_end, self.response), name="gae_fetch").start()

ok = "ok"
while self.keep_running and \
Expand Down
4 changes: 2 additions & 2 deletions code/default/gae_proxy/local/ipv6_tunnel/pteredor.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def __init__(self, server_list=teredo_server_list, local_port=None,
elif len(self.server_ip_list) < 2:
print(('Need input more teredo servers, now is %d.'
% len(self.server_ip_list)))
threading.Thread(target=self.receive_loop).start()
threading.Thread(target=self.receive_loop, name="teredo_recieve_loop").start()
if probe_nat:
self.nat_type = self.nat_type_probe()

Expand Down Expand Up @@ -413,7 +413,7 @@ def eval_servers(self):
eval_list = []
queue_obj = Queue.Queue()
for server_ip in self.server_ip_list:
threading.Thread(target=self._eval_servers, args=(server_ip, queue_obj)).start()
threading.Thread(target=self._eval_servers, args=(server_ip, queue_obj), name="teredor_eval").start()
for _ in self.server_ip_list:
eval_list.append(queue_obj.get())
return eval_list
Expand Down
2 changes: 1 addition & 1 deletion code/default/gae_proxy/local/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def main(args):

log_info()

threading.Thread(target=CertUtil.init_ca, args=(no_mess_system,)).start()
threading.Thread(target=CertUtil.init_ca, args=(no_mess_system,), name="init_ca").start()

listen_ips = front.config.listen_ip
if isinstance(listen_ips, str):
Expand Down
2 changes: 1 addition & 1 deletion code/default/launcher/download_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,6 @@ def download_worker():


def start_download():
th = threading.Thread(target=download_worker)
th = threading.Thread(target=download_worker, name="file_downloader")
th.start()
return True
2 changes: 1 addition & 1 deletion code/default/launcher/module_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def start(module):
proc_handler[module]["imp"] = __import__(module, globals(), locals(), ['local'], 0)

_local = proc_handler[module]["imp"].local
p = threading.Thread(target=_local.start, args=([xargs]))
p = threading.Thread(target=_local.start, args=([xargs]), name="%s_start" % module)
p.daemon = True
p.start()
proc_handler[module]["proc"] = p
Expand Down
6 changes: 3 additions & 3 deletions code/default/launcher/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def check_new_machine():
create_desktop_shortcut()


def check_loop():
def update_check_loop():
check_new_machine()

# wait gae_proxy to start
Expand All @@ -388,8 +388,8 @@ def check_loop():


def start():
p = threading.Thread(target=check_loop)
p.setDaemon(True)
p = threading.Thread(target=update_check_loop, name="check_update")
p.daemon = True
p.start()


Expand Down
2 changes: 1 addition & 1 deletion code/default/launcher/update_from_github.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def start_update_version(version, checkhash=1):
return progress["update_status"]

progress["update_status"] = "Start update"
th = threading.Thread(target=update_version, args=(version, checkhash))
th = threading.Thread(target=update_version, args=(version, checkhash), name="update_version")
th.start()
return True

Expand Down
8 changes: 8 additions & 0 deletions code/default/launcher/web_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import threading
import json
import cgi
import traceback

try:
from urllib.parse import urlparse, urlencode, parse_qs
Expand Down Expand Up @@ -823,6 +824,13 @@ def req_debug_handler(self):
dat = ""
try:
dat += "thread num:%d<br>" % threading.active_count()
for thread in threading.enumerate():
dat += "\nThread: %s \r\n" % (thread.name)
# dat += traceback.format_exc(sys._current_frames()[thread.ident])
stack = sys._current_frames()[thread.ident]
st = traceback.extract_stack(stack)
stl = traceback.format_list(st)
dat += " \n".join(stl)

except Exception as e:
xlog.exception("debug:%r", e)
Expand Down
2 changes: 1 addition & 1 deletion code/default/lib/noarch/dnslib/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def start(self):
self.server.serve_forever()

def start_thread(self):
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread = threading.Thread(target=self.server.serve_forever, name="dns_server")
self.thread.daemon = True
self.thread.start()

Expand Down
11 changes: 7 additions & 4 deletions code/default/lib/noarch/front_base/connect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,14 @@ def __init__(self, logger, config, connect_creator, ip_manager, check_local_netw

self.connecting_more_thread = None

self.keep_alive_th = threading.Thread(target=self.keep_alive_thread)
self.keep_alive_th = threading.Thread(target=self.keep_alive_thread,
name="%s_conn_manager_keep_alive" % self.logger.name)
self.keep_alive_th.daemon = True
self.keep_alive_th.start()

if self.config.connection_pool_min:
self.keep_conn_th = threading.Thread(target=self.keep_connection_daemon)
self.keep_conn_th = threading.Thread(target=self.keep_connection_daemon,
name="%s_conn_manager_keep_conn" % self.logger.name)
self.keep_conn_th.daemon = True
self.keep_conn_th.start()
else:
Expand Down Expand Up @@ -226,7 +228,8 @@ def _need_more_ip(self):
def _create_more_connection(self):
if not self.connecting_more_thread:
with self.thread_num_lock:
self.connecting_more_thread = threading.Thread(target=self._create_more_connection_worker)
self.connecting_more_thread = threading.Thread(target=self._create_more_connection_worker,
name="%s_conn_manager__create_more_conn" % self.logger.name)
self.connecting_more_thread.start()

def _create_more_connection_worker(self):
Expand All @@ -235,7 +238,7 @@ def _create_more_connection_worker(self):
self.thread_num_lock.acquire()
self.thread_num += 1
self.thread_num_lock.release()
p = threading.Thread(target=self._connect_thread)
p = threading.Thread(target=self._connect_thread, name="%s_conn_manager__connect_th" % self.logger.name)
p.start()
time.sleep(self.config.connect_create_interval)

Expand Down
4 changes: 2 additions & 2 deletions code/default/lib/noarch/front_base/http1.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
self.record_active("init")

self.task_queue = Queue()
threading.Thread(target=self.work_loop).start()
threading.Thread(target=self.work_loop, name="%s_http1_work_loop" % self.logger.name).start()
self.idle_cb()

if self.config.http1_first_ping_wait or \
self.config.http1_ping_interval or \
self.config.http1_idle_time:
threading.Thread(target=self.keep_alive_thread).start()
threading.Thread(target=self.keep_alive_thread, name="%s_http1_keep_alive" % self.logger.name).start()

def record_active(self, active=""):
self.trace_time.append([time.time(), active])
Expand Down
12 changes: 6 additions & 6 deletions code/default/lib/noarch/front_base/http2_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
# send Setting frame before accept task.
self._send_preamble()

threading.Thread(target=self.send_loop).start()
threading.Thread(target=self.recv_loop).start()
threading.Thread(target=self.h2_send_loop, name="h2_send_%s" % self.ip_str).start()
threading.Thread(target=self.h2_recv_loop, name="h2_recv_%s" % self.ip_str).start()

# export api
def request(self, task):
Expand Down Expand Up @@ -174,7 +174,7 @@ def request_task(self, task):
self.streams[stream_id] = stream
stream.start_request()

def send_loop(self):
def h2_send_loop(self):
while self.keep_running:
frame = self.send_queue.get(True)
if not frame:
Expand Down Expand Up @@ -210,7 +210,7 @@ def send_loop(self):
self.logger.debug("http2 %s send error:%r", self.ip_str, e)
self.close("send fail:%r" % e)

def recv_loop(self):
def h2_recv_loop(self):
while self.keep_running:
try:
self._consume_single_frame()
Expand All @@ -234,8 +234,8 @@ def close(self, reason="conn close"):
self.send_queue.put(None)

for stream in list(self.streams.values()):
if stream.task.responsed:
# response have send to client
if stream.task.responsed or stream.task.start_time + stream.task.timeout < time.time():
# response have sent to client, or timeout
# can't retry
stream.close(reason=reason)
else:
Expand Down
5 changes: 4 additions & 1 deletion code/default/lib/noarch/front_base/http2_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@


import threading
import time

from hyper.common.headers import HTTPHeaderMap
from hyper.packages.hyperframe.frame import (
FRAME_MAX_LEN, FRAMES, HeadersFrame, DataFrame, PushPromiseFrame,
Expand Down Expand Up @@ -378,7 +380,8 @@ def send_response(self):
def close(self, reason="close"):
if not self.task.responsed:
# self.task.set_state("stream close: %s, call retry" % reason)
self.connection.retry_task_cb(self.task, reason)
if self.task.start_time + self.task.timeout > time.time():
self.connection.retry_task_cb(self.task, reason)
else:
# self.task.set_state("stream close: %s, finished" % reason)
self.task.finish()
Expand Down
6 changes: 3 additions & 3 deletions code/default/lib/noarch/front_base/http_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ def __init__(self, logger, config, ip_manager, connection_manager,
self.trigger_create_worker_cv = SimpleCondition()
self.wait_a_worker_cv = SimpleCondition()

threading.Thread(target=self.dispatcher).start()
threading.Thread(target=self.create_worker_thread).start()
threading.Thread(target=self.connection_checker).start()
threading.Thread(target=self.dispatcher, name="%s_dispatch" % self.logger.name).start()
threading.Thread(target=self.create_worker_thread, name="%s_worker_creator" % self.logger.name).start()
threading.Thread(target=self.connection_checker, name="%s_conn_checker" % self.logger.name).start()

def stop(self):
self.running = False
Expand Down
10 changes: 6 additions & 4 deletions code/default/lib/noarch/front_base/ip_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(self, logger, config, ip_source, host_manager, check_local_network,
self.ip_lock = threading.Lock()
self.reset()

self.check_ip_thread = threading.Thread(target=self.check_ip_process)
self.check_ip_thread = threading.Thread(target=self.check_ip_process, name="%s_ip_manager_check_ip" % self.logger.name)
self.check_ip_thread.daemon = True
self.check_ip_thread.start()

Expand Down Expand Up @@ -816,7 +816,7 @@ def search_more_ip(self):
self.scan_thread_count += 1
self.scan_thread_lock.release()

p = threading.Thread(target=self.scan_ip_worker)
p = threading.Thread(target=self.scan_ip_worker, name="%s_ip_manager_scan_ip" % self.logger.name)
p.start()

def scan_all_exist_ip(self):
Expand All @@ -830,7 +830,8 @@ def scan_all_exist_ip(self):
self.keep_scan_all_exist_ip = True
scan_threads = []
for i in range(0, 50):
th = threading.Thread(target=self.scan_exist_ip_worker, )
th = threading.Thread(target=self.scan_exist_ip_worker,
name="%s_ip_manager_scan_exist_ip" % self.logger.name)
th.start()
scan_threads.append(th)

Expand All @@ -849,7 +850,8 @@ def start_scan_all_exist_ip(self):
self.logger.warn("scan all exist ip is running")
return

self.scan_all_ip_thread = threading.Thread(target=self.scan_all_exist_ip)
self.scan_all_ip_thread = threading.Thread(target=self.scan_all_exist_ip,
name="%s_ip_manager_scan_all_exist_ip" % self.logger.name)
self.scan_all_ip_thread.start()

def stop_scan_all_exist_ip(self):
Expand Down
2 changes: 1 addition & 1 deletion code/default/lib/noarch/front_base/ip_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(self, logger, source_txt_fn, dest_bin_fn):
self.dest_bin_fn = dest_bin_fn

self.bin_fd = None
threading.Thread(target=self.init).start()
threading.Thread(target=self.init, name="%s_ipv4PoolSource_init" % self.logger.name).start()

def init(self):
if not self.check_bin():
Expand Down
8 changes: 4 additions & 4 deletions code/default/lib/noarch/simple_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,8 @@ def __init__(self, address, handler, args=(), use_https=False, cert="", logger=x
# self.logger.info("server %s:%d started.", address[0], address[1])

def start(self):
self.http_thread = threading.Thread(target=self.serve_forever)
self.http_thread.setDaemon(True)
self.http_thread = threading.Thread(target=self.serve_forever, name="serve_%s" % self.server_address)
self.http_thread.daemon = True
self.http_thread.start()

def init_socket(self):
Expand Down Expand Up @@ -605,13 +605,13 @@ def serve_forever(self):

def process_connect(self, sock, address):
# self.logger.debug("connect from %s:%d", address[0], address[1])
if threading.activeCount() > self.max_thread:
if threading.active_count() > self.max_thread:
self.logger.warn("thread num exceed the limit. drop request from %s.", address)
sock.close()
return

client_obj = self.handler(sock, address, self.args)
client_thread = threading.Thread(target=client_obj.handle)
client_thread = threading.Thread(target=client_obj.handle, name="handle_%s:%d" % address)
client_thread.start()

def shutdown(self):
Expand Down
2 changes: 1 addition & 1 deletion code/default/lib/win32/systray/traybar.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _message_loop_func(self):
def start(self):
if self._hwnd:
return # already started
self._message_loop_thread = threading.Thread(target=self._message_loop_func)
self._message_loop_thread = threading.Thread(target=self._message_loop_func, name="win32_systray")
self._message_loop_thread.daemon = True
self._message_loop_thread.start()

Expand Down
7 changes: 4 additions & 3 deletions code/default/smart_router/local/connect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ def __init__(self, connection_timeout=15, connect_threads=3, connect_timeout=5):
self.connect_threads = connect_threads

self.running = True
threading.Thread(target=self.check_thread).start()
threading.Thread(target=self.connection_check_worker, name="smart_router_conn_checker").start()

def stop(self):
self.running = False

def check_thread(self):
def connection_check_worker(self):
while self.running:
time_now = time.time()
with self.lock:
Expand Down Expand Up @@ -153,7 +153,8 @@ def get_conn(self, host, ips, port, timeout=5):
wait_queue = Queue()
wait_t = 0.2
for ip in ordered_ips:
threading.Thread(target=self.create_connect, args=(wait_queue, host, ip, port)).start()
threading.Thread(target=self.create_connect, args=(wait_queue, host, ip, port),
name="smart_router_create_conn_%s" % ip).start()
try:
status = wait_queue.get(timeout=wait_t)
sock = self.get_sock_from_cache(host_port)
Expand Down
Loading

0 comments on commit 31638bb

Please sign in to comment.