From d2db6761f61b180b74295768867addfb9fbb1056 Mon Sep 17 00:00:00 2001 From: a76yyyy Date: Sun, 4 Feb 2024 02:13:54 +0800 Subject: [PATCH] =?UTF-8?q?Bugfix(worker&web):=20=F0=9F=90=9B=20incr=5Fsuc?= =?UTF-8?q?cess=20=E5=BA=94=E5=8F=91=E7=94=9F=E5=9C=A8=E4=BA=8B=E5=8A=A1?= =?UTF-8?q?=E5=8D=B3=E5=B0=86=E7=BB=93=E6=9D=9F=E5=89=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- web/handlers/task.py | 5 ++++- worker.py | 46 +++++++++++++++++++++++++++----------------- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/web/handlers/task.py b/web/handlers/task.py index 58196bf65f0..699191e8eed 100644 --- a/web/handlers/task.py +++ b/web/handlers/task.py @@ -165,6 +165,9 @@ async def post(self, taskid): self.evil(+2) start_ts = int(time.time()) user = self.current_user + pushsw = None + title = f"QD 任务ID: {taskid} 完成" + logtmp = "" async with self.db.transaction() as sql_session: task = self.check_permission(await self.db.task.get(taskid, fields=('id', 'tplid', 'userid', 'init_env', 'env', 'session', 'retry_count', 'retry_interval', 'last_success', 'last_failed', 'success_count', 'note', @@ -250,11 +253,11 @@ async def post(self, taskid): except StreamClosedError as e: logger_web_handler.error('stream closed error: %s', e, exc_info=config.traceback_print) - await pushertool.pusher(user['id'], pushsw, 0x8, title, logtmp) log_day = int((await self.db.site.get(1, fields=('logDay',), sql_session=sql_session))['logDay']) for log in await self.db.tasklog.list(taskid=taskid, fields=('id', 'ctime'), sql_session=sql_session): if (time.time() - log['ctime']) > (log_day * 24 * 60 * 60): await self.db.tasklog.delete(log['id'], sql_session=sql_session) + await pushertool.pusher(user['id'], pushsw, 0x8, title, logtmp) return diff --git a/worker.py b/worker.py index 00051369763..4e49c763de5 100644 --- a/worker.py +++ b/worker.py @@ -110,17 +110,17 @@ async def push_batch(self): tmp += ''.join(tmpval) logtemp += tmp push_batch["time"] = push_batch['time'] + delta - await self.db.user.mod(userid, push_batch=json.dumps(push_batch), sql_session=sql_session) if tmp and numlog: user_email = user.get('email', 'Unkown') logger_worker.debug( "Start push batch log for user %s, email:%s", userid, user_email) await pushtool.pusher(userid, {"pushen": bool(push_batch.get("sw", False))}, 4080, title, logtemp) logger_worker.info( - "Success push batch log for user %s, email:%s", userid, user_email) + "Complete push batch log for user %s, email:%s", userid, user_email) else: logger_worker.debug( 'User %s does not need to perform push_batch task, stop.', userid) + await self.db.user.mod(userid, push_batch=json.dumps(push_batch), sql_session=sql_session) except Exception as e: logger_worker.error('Push batch task failed: %s', e, exc_info=config.traceback_print) @@ -173,8 +173,15 @@ def fix_next_time(next: float, gmt_offset=time.timezone / 60) -> float: return next async def do(self, task): + is_success = False + should_push = 0 + userid = None + title = f"QD 定时任务ID {task['id']}-{task.get('note',None)} 完成" + content = "" + pushsw = json.loads(task['pushsw']) async with self.db.transaction() as sql_session: user = await self.db.user.get(task['userid'], fields=('id', 'email', 'email_verified', 'nickname', 'logtime'), sql_session=sql_session) + userid = user['id'] if not user: await self.db.tasklog.add(task['id'], False, msg='no such user, disabled.', sql_session=sql_session) await self.db.task.mod(task['id'], next=None, disabled=1, sql_session=sql_session) @@ -196,15 +203,6 @@ async def do(self, task): await self.db.task.mod(task['id'], next=None, disabled=1, sql_session=sql_session) return False - newontime = json.loads(task["newontime"]) - pushtool = Pusher(self.db, sql_session=sql_session) - caltool = Cal() - logtime = json.loads(user['logtime']) - pushsw = json.loads(task['pushsw']) - - if 'ErrTolerateCnt' not in logtime: - logtime['ErrTolerateCnt'] = 0 - start = time.perf_counter() try: fetch_tpl = await self.db.user.decrypt(0 if not tpl['userid'] else task['userid'], tpl['tpl'], sql_session=sql_session) @@ -230,6 +228,8 @@ async def do(self, task): session = await self.db.user.encrypt(task['userid'], new_env['session'].to_json() if hasattr(new_env['session'], 'to_json') else new_env['session'], sql_session=sql_session) + newontime = json.loads(task["newontime"]) + caltool = Cal() if newontime['sw']: if 'mode' not in newontime: newontime['mode'] = 'ontime' @@ -259,9 +259,9 @@ async def do(self, task): t = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') title = f"QD定时任务 {tpl['sitename']}-{task['note']} 成功" - logtemp = new_env['variables'].get('__log__') - logtemp = f"{t} \\r\\n日志:{logtemp}" - await pushtool.pusher(user['id'], pushsw, 0x2, title, logtemp) + content = new_env['variables'].get('__log__') + content = f"{t} \\r\\n日志:{content}" + should_push = 0x2 logger_worker.info('taskid:%d tplid:%d successed! %.5fs', task['id'], task['tplid'], time.perf_counter() - start) @@ -269,6 +269,7 @@ async def do(self, task): await self.clear_log(task['id'], sql_session=sql_session) logger_worker.info( 'taskid:%d tplid:%d clear log.', task['id'], task['tplid']) + is_success = True except Exception as e: # failed feedback if config.traceback_print: @@ -284,13 +285,16 @@ async def do(self, task): next = time.time() + next_time_delta content = content + \ f" \\r\\n下次运行时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next))}" + logtime = json.loads(user['logtime']) + if 'ErrTolerateCnt' not in logtime: + logtime['ErrTolerateCnt'] = 0 if logtime['ErrTolerateCnt'] <= task['last_failed_count']: - await pushtool.pusher(user['id'], pushsw, 0x1, title, content) + should_push = 0x1 else: disabled = True next = None content = " \\r\\n任务已禁用" - await pushtool.pusher(user['id'], pushsw, 0x1, title, content) + should_push = 0x1 await self.db.tasklog.add(task['id'], success=False, msg=str(e), sql_session=sql_session) await self.db.task.mod(task['id'], @@ -305,8 +309,14 @@ async def do(self, task): logger_worker.error('taskid:%d tplid:%d failed! %.4fs \r\n%s', task['id'], task['tplid'], time.perf_counter( ) - start, str(e).replace('\\r\\n', '\r\n')) - return False - return True + + if should_push: + try: + pushtool = Pusher(self.db, sql_session=sql_session) + await pushtool.pusher(userid, pushsw, should_push, title, content) + except Exception as e: + logger_worker.error('taskid:%d push failed! %s', task['id'], str(e), exc_info=config.traceback_print) + return is_success class QueueWorker(BaseWorker):