Skip to content

Commit

Permalink
5.5.0 Improve performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-X-Net committed Aug 1, 2023
1 parent 9ab4af4 commit 9b7d31b
Show file tree
Hide file tree
Showing 31 changed files with 841 additions and 422 deletions.
2 changes: 1 addition & 1 deletion code/default/launcher/non_tray.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def on_quit(self, widget=None, data=None):

def serve_forever(self):
while global_var.running:
time.sleep(1)
time.sleep(10)


sys_tray = None_tray()
Expand Down
2 changes: 1 addition & 1 deletion code/default/launcher/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def main():
sys_platform.show_systray()
else:
while global_var.running:
time.sleep(1)
time.sleep(10)


try:
Expand Down
2 changes: 2 additions & 0 deletions code/default/launcher/web_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,8 @@ def req_debug_handler(self):
continue
dat += "%s => %d\n\n" % (path, n)

dat += "thread num:%d<br>" % threading.active_count()

self.send_response("text/plain", dat)
except Exception as e:
xlog.exception("debug:%r", e)
Expand Down
1 change: 1 addition & 0 deletions code/default/lib/noarch/front_base/boringssl_wrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def send(self, data, flags=0):
self.select2.select(timeout=self.timeout)
continue
else:
# self._context.logger.debug("send:%d ip:%s", ret, self.ip_str)
break

return ret
Expand Down
3 changes: 2 additions & 1 deletion code/default/lib/noarch/front_base/check_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def check_ip(self, ip, sni=None, host=None, wait_time=0, path=None, headers={}):
host = ssl_sock.host
self.logger.info("host:%s", host)

time.sleep(wait_time)
if wait_time:
time.sleep(wait_time)
start_time = time.time()

if not ssl_sock.h2:
Expand Down
10 changes: 6 additions & 4 deletions code/default/lib/noarch/front_base/connect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def set_ssl_created_cb(self, cb):

def keep_alive_thread(self):
while self.running:
to_keep_live_list = self.new_conn_pool.get_need_keep_alive(maxtime=self.config.https_keep_alive-3)
to_keep_live_list = self.new_conn_pool.get_need_keep_alive(maxtime=self.config.https_keep_alive-6)

for ssl_sock in to_keep_live_list:
inactive_time = time.time() - ssl_sock.last_use_time
Expand All @@ -207,7 +207,7 @@ def keep_alive_thread(self):
# no appid avaiable
pass

time.sleep(1)
time.sleep(5)

def keep_connection_daemon(self):
while self.running:
Expand Down Expand Up @@ -243,7 +243,9 @@ def _create_more_connection_worker(self):
self.connecting_more_thread = None

def _connect_thread(self, sleep_time=0):
time.sleep(sleep_time)
if sleep_time > 0.1:
time.sleep(sleep_time)

try:
while self.running and self._need_more_ip():
if self.new_conn_pool.qsize() > self.config.https_connection_pool_max:
Expand Down Expand Up @@ -272,7 +274,7 @@ def _connect_process(self):

self.new_conn_pool.put((ssl_sock.handshake_time, ssl_sock))

if self.config.connect_create_interval > 0:
if self.config.connect_create_interval > 0.1:
sleep = random.uniform(self.config.connect_create_interval, self.config.connect_create_interval*2)
time.sleep(sleep)

Expand Down
11 changes: 6 additions & 5 deletions code/default/lib/noarch/front_base/http1.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ def keep_alive_thread(self):
if not self.keep_running:
self.close("exit ")
return
time.sleep(1)
time.sleep(3)

if self.config.http1_first_ping_wait and self.processed_tasks == 0:
self.task_queue.put("ping")

if self.config.http1_ping_interval:
while self.keep_running:
time_to_ping = max(self.config.http1_ping_interval - (time.time() - self.last_recv_time), 0.2)
time_to_ping = max(self.config.http1_ping_interval - (time.time() - self.last_recv_time), 3)
time.sleep(time_to_ping)

if not self.request_onway and \
time.time() - self.last_recv_time > self.config.http1_ping_interval - 1:
time.time() - self.last_recv_time > self.config.http1_ping_interval - 3:
self.task_queue.put("ping")
time.sleep(1)
time.sleep(3)

elif self.config.http1_idle_time:
while self.keep_running:
Expand Down Expand Up @@ -259,7 +259,8 @@ def head_request(self):

content = response.readall(timeout=5)
self.record_active("head end")
self.rtt = (time.time() - start_time) * 1000
rtt = (time.time() - start_time) * 1000
self.update_rtt(rtt)
return True
except Exception as e:
self.logger.warn("h1 %s HEAD keep alive request fail:%r", self.ssl_sock.ip_str, e)
Expand Down
17 changes: 9 additions & 8 deletions code/default/lib/noarch/front_base/http2_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def send_loop(self):
continue

# wait for payload frame
time.sleep(0.01)
# time.sleep(0.01)
# combine header and payload in one tcp package.
if not self.send_queue._qsize():
self._sock.flush()
Expand Down Expand Up @@ -429,7 +429,7 @@ def receive_frame(self, frame):
rtt = (time_now - ping_time) * 1000
if rtt < 0:
self.logger.error("rtt:%f ping_time:%f now:%f", rtt, ping_time, time_now)
self.rtt = rtt
self.update_rtt(rtt)
self.ping_on_way -= 1
#self.logger.debug("RTT:%d, on_way:%d", self.rtt, self.ping_on_way)
if self.keep_running and self.ping_on_way == 0:
Expand Down Expand Up @@ -525,10 +525,11 @@ def check_active(self, now):
if not self.keep_running:
return

if len(self.streams) == 0:
if self.is_life_end():
return self.close("life end")
if len(self.streams):
return

if self.is_life_end():
return self.close("life end")

if now - self.last_send_time > self.config.http2_idle_ping_min_interval:
self.send_ping()
return
if now - self.last_send_time > self.config.http2_idle_ping_min_interval:
self.send_ping()
37 changes: 23 additions & 14 deletions code/default/lib/noarch/front_base/http2_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,26 +319,35 @@ def receive_frame(self, frame):
whole_cost = time_now - self.start_time
receive_cost = time_now - self.get_head_time
bytes_received = self.connection._sock.bytes_received - self.start_connection_point
if receive_cost > 0 and bytes_received > 10000 and not self.task.finished and receive_cost > 0.001:
# speed = bytes_received / receive_cost
speed = (len(self.request_body) + bytes_received) / (whole_cost - xcost)
if receive_cost > 0 and bytes_received > 10000 and not self.task.finished and receive_cost > 0.001 \
and xcost >= 0:
rtt = whole_cost - xcost
t_road = rtt
if t_road <= self.connection.handshake:
# adjust handshake
self.connection.handshake = t_road
else:
t_road -= self.connection.handshake

speed = (len(self.request_body) + bytes_received) / t_road
self.connection.update_speed(speed)
self.task.set_state("h2_finish[SP:%d]" % speed)

send_cost = len(self.request_body) / speed
streams_cost = ((self.connection.max_payload /2) * self.get_head_stream_num) / speed

if xcost >= 0:
rtt = whole_cost - xcost - send_cost - receive_cost # - streams_cost
if self.config.http2_show_debug:
self.logger.debug("%s RTT:%f rtt:%f send_len:%d recv_len:%d "
"whole_Cost:%f xcost:%f rcost:%f send_cost:%f recv_cost:%f "
"streams_cost:%f Speed: %f",
self.connection.ssl_sock.ip_str,
self.connection.rtt * 1000, rtt * 1000,
len(self.request_body), bytes_received,
whole_cost, xcost, rcost, send_cost, receive_cost, streams_cost, speed)
self.connection.update_rtt(rtt)
if self.config.http2_show_debug:
self.logger.debug("%s RTT:%f rtt:%f send_len:%d recv_len:%d "
"whole_Cost:%f xcost:%f rcost:%f send_cost:%f recv_cost:%f "
"streams_cost:%f Speed: %f",
self.connection.ssl_sock.ip_str,
self.connection.rtt * 1000, rtt * 1000,
len(self.request_body), bytes_received,
whole_cost, xcost, rcost, send_cost, receive_cost, streams_cost, speed)
self.connection.update_rtt(rtt, self.task.predict_rtt)
self.logger.debug("%s handshake:%f stream:%d up:%d down:%d rtt:%f", self.connection.ip_str,
self.connection.handshake,
len(self.connection.streams), len(self.request_body), bytes_received, rtt)

self._close_remote()

Expand Down
66 changes: 47 additions & 19 deletions code/default/lib/noarch/front_base/http_common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
import random

import simple_queue
from queue import Queue
import simple_http_client


Expand All @@ -20,7 +20,8 @@ def __init__(self, logger, config, method, host, path, headers, body, queue, url
self.start_time = time.time()
self.unique_id = "%s:%f" % (url, self.start_time)
self.trace_time = []
self.body_queue = simple_queue.Queue()
self.body_queue = Queue()
self.predict_rtt = 0.5
self.body_len = 0
self.body_readed = 0
self.content_length = None
Expand Down Expand Up @@ -56,7 +57,11 @@ def read(self, size=None):

if size:
while self.read_buffer_len < size:
data = self.body_queue.get(self.timeout)
try:
data = self.body_queue.get(timeout=self.timeout)
except:
data = None

if not data:
return b''

Expand Down Expand Up @@ -94,7 +99,11 @@ def read(self, size=None):
data = self.read_buffers.pop(0)
self.read_buffer_len -= len(data)
else:
data = self.body_queue.get(self.timeout)
try:
data = self.body_queue.get(timeout=self.timeout)
except:
data = None

if not data:
return b''

Expand Down Expand Up @@ -186,8 +195,9 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
self.ip_manager = ip_manager
self.config = config
self.ssl_sock = ssl_sock
self.handshake = ssl_sock.handshake_time * 0.001
self.rtt = ssl_sock.handshake_time * 0.001
self.speed = 5000000
self.speed = 15000000
self.ip_str = ssl_sock.ip_str
self.close_cb = close_cb
self.retry_task_cb = retry_task_cb
Expand All @@ -198,6 +208,7 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
self.processed_tasks = 0
self.continue_fail_tasks = 0
self.rtt_history = [self.rtt,]
self.adjust_history = []
self.speed_history = [self.speed, self.speed, self.speed]
self.last_recv_time = self.ssl_sock.create_time
self.last_send_time = self.ssl_sock.create_time
Expand All @@ -210,21 +221,29 @@ def __str__(self):
o += " running: %s\r\n" % (self.keep_running)
o += " processed_tasks: %d\r\n" % (self.processed_tasks)
o += " continue_fail_tasks: %s\r\n" % (self.continue_fail_tasks)
o += " handshake: %f \r\n" % self.handshake
o += " rtt_history: %s\r\n" % (self.rtt_history)
o += " speed_history: %s\r\n" % (self.speed_history)
o += " adjust_history: %s\r\n" % (self.adjust_history)
if self.version != "1.1":
o += "streams: %d\r\n" % len(self.streams)
o += " rtt: %f\r\n" % (self.rtt)
o += " speed: %f\r\n" % (self.speed)
o += " score: %f\r\n" % (self.get_score())
return o

def update_rtt(self, rtt):
def update_rtt(self, rtt, predict_rtt=None):
self.rtt_history.append(rtt)
if len(self.rtt_history) > 10:
self.rtt_history.pop(0)
# self.rtt = sum(self.rtt_history) / len(self.rtt_history)

if predict_rtt:
adjust = rtt - predict_rtt
self.adjust_history.append(adjust)
if len(self.adjust_history) > 10:
self.adjust_history.pop(0)

def update_speed(self, speed):
self.speed_history.append(speed)
if len(self.speed_history) > 10:
Expand All @@ -240,7 +259,8 @@ def update_debug_data(self, rtt, sent, received, speed):
# else:
# self.rtt = rtt

self.log_debug_data(rtt, sent, received)
# self.log_debug_data(rtt, sent, received)
return

def close(self, reason):
if not self.keep_running:
Expand All @@ -261,19 +281,27 @@ def close(self, reason):
def get_score(self):
# The smaller, the better

score = self.rtt
if self.version != "1.1":
response_body_len = self.max_payload
for _, stream in self.streams.items():
if stream.response_body_len == 0:
response_body_len += self.max_payload
else:
response_body_len += stream.response_body_len - stream.task.body_len
score += response_body_len / self.speed
score = self.handshake
if self.processed_tasks == 0 and len(self.streams) == 0:
score /= 3

if self.config.show_state_debug:
self.logger.debug("get_score %s, speed:%f rtt:%d stream_num:%d score:%f", self.ip_str,
self.speed * 0.000001, self.rtt * 1000, len(self.streams), score)
if self.version == "1.1":
score += self.max_payload / self.speed
return score

response_body_len = self.max_payload
for _, stream in self.streams.items():
if stream.response_body_len == 0:
response_body_len += self.max_payload
else:
response_body_len += stream.response_body_len - stream.task.body_len
score += response_body_len / self.speed

score += len(self.streams) * 0.06

if self.config.show_state_debug:
self.logger.debug("get_score %s, speed:%f rtt:%d stream_num:%d score:%f", self.ip_str,
self.speed * 0.000001, self.rtt * 1000, len(self.streams), score)

return score

Expand Down
Loading

0 comments on commit 9b7d31b

Please sign in to comment.