Skip to content

Commit

Permalink
Bugfix(worker&web): 🐛 incr_success 应发生在事务即将结束前
Browse files Browse the repository at this point in the history
  • Loading branch information
a76yyyy committed Feb 3, 2024
1 parent 1d203a4 commit d2db676
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
5 changes: 4 additions & 1 deletion web/handlers/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ async def post(self, taskid):
self.evil(+2)
start_ts = int(time.time())
user = self.current_user
pushsw = None
title = f"QD 任务ID: {taskid} 完成"
logtmp = ""
async with self.db.transaction() as sql_session:
task = self.check_permission(await self.db.task.get(taskid, fields=('id', 'tplid', 'userid', 'init_env',
'env', 'session', 'retry_count', 'retry_interval', 'last_success', 'last_failed', 'success_count', 'note',
Expand Down Expand Up @@ -250,11 +253,11 @@ async def post(self, taskid):
except StreamClosedError as e:
logger_web_handler.error('stream closed error: %s', e, exc_info=config.traceback_print)

await pushertool.pusher(user['id'], pushsw, 0x8, title, logtmp)
log_day = int((await self.db.site.get(1, fields=('logDay',), sql_session=sql_session))['logDay'])
for log in await self.db.tasklog.list(taskid=taskid, fields=('id', 'ctime'), sql_session=sql_session):
if (time.time() - log['ctime']) > (log_day * 24 * 60 * 60):
await self.db.tasklog.delete(log['id'], sql_session=sql_session)
await pushertool.pusher(user['id'], pushsw, 0x8, title, logtmp)
return


Expand Down
46 changes: 28 additions & 18 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,17 @@ async def push_batch(self):
tmp += ''.join(tmpval)
logtemp += tmp
push_batch["time"] = push_batch['time'] + delta
await self.db.user.mod(userid, push_batch=json.dumps(push_batch), sql_session=sql_session)
if tmp and numlog:
user_email = user.get('email', 'Unkown')
logger_worker.debug(
"Start push batch log for user %s, email:%s", userid, user_email)
await pushtool.pusher(userid, {"pushen": bool(push_batch.get("sw", False))}, 4080, title, logtemp)
logger_worker.info(
"Success push batch log for user %s, email:%s", userid, user_email)
"Complete push batch log for user %s, email:%s", userid, user_email)
else:
logger_worker.debug(
'User %s does not need to perform push_batch task, stop.', userid)
await self.db.user.mod(userid, push_batch=json.dumps(push_batch), sql_session=sql_session)
except Exception as e:
logger_worker.error('Push batch task failed: %s', e, exc_info=config.traceback_print)

Expand Down Expand Up @@ -173,8 +173,15 @@ def fix_next_time(next: float, gmt_offset=time.timezone / 60) -> float:
return next

async def do(self, task):
is_success = False
should_push = 0
userid = None
title = f"QD 定时任务ID {task['id']}-{task.get('note',None)} 完成"
content = ""
pushsw = json.loads(task['pushsw'])
async with self.db.transaction() as sql_session:
user = await self.db.user.get(task['userid'], fields=('id', 'email', 'email_verified', 'nickname', 'logtime'), sql_session=sql_session)
userid = user['id']
if not user:
await self.db.tasklog.add(task['id'], False, msg='no such user, disabled.', sql_session=sql_session)
await self.db.task.mod(task['id'], next=None, disabled=1, sql_session=sql_session)
Expand All @@ -196,15 +203,6 @@ async def do(self, task):
await self.db.task.mod(task['id'], next=None, disabled=1, sql_session=sql_session)
return False

newontime = json.loads(task["newontime"])
pushtool = Pusher(self.db, sql_session=sql_session)
caltool = Cal()
logtime = json.loads(user['logtime'])
pushsw = json.loads(task['pushsw'])

if 'ErrTolerateCnt' not in logtime:
logtime['ErrTolerateCnt'] = 0

start = time.perf_counter()
try:
fetch_tpl = await self.db.user.decrypt(0 if not tpl['userid'] else task['userid'], tpl['tpl'], sql_session=sql_session)
Expand All @@ -230,6 +228,8 @@ async def do(self, task):
session = await self.db.user.encrypt(task['userid'],
new_env['session'].to_json() if hasattr(new_env['session'], 'to_json') else new_env['session'], sql_session=sql_session)

newontime = json.loads(task["newontime"])
caltool = Cal()
if newontime['sw']:
if 'mode' not in newontime:
newontime['mode'] = 'ontime'
Expand Down Expand Up @@ -259,16 +259,17 @@ async def do(self, task):

t = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
title = f"QD定时任务 {tpl['sitename']}-{task['note']} 成功"
logtemp = new_env['variables'].get('__log__')
logtemp = f"{t} \\r\\n日志:{logtemp}"
await pushtool.pusher(user['id'], pushsw, 0x2, title, logtemp)
content = new_env['variables'].get('__log__')
content = f"{t} \\r\\n日志:{content}"
should_push = 0x2

logger_worker.info('taskid:%d tplid:%d successed! %.5fs',
task['id'], task['tplid'], time.perf_counter() - start)
# delete log
await self.clear_log(task['id'], sql_session=sql_session)
logger_worker.info(
'taskid:%d tplid:%d clear log.', task['id'], task['tplid'])
is_success = True
except Exception as e:
# failed feedback
if config.traceback_print:
Expand All @@ -284,13 +285,16 @@ async def do(self, task):
next = time.time() + next_time_delta
content = content + \
f" \\r\\n下次运行时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next))}"
logtime = json.loads(user['logtime'])
if 'ErrTolerateCnt' not in logtime:
logtime['ErrTolerateCnt'] = 0
if logtime['ErrTolerateCnt'] <= task['last_failed_count']:
await pushtool.pusher(user['id'], pushsw, 0x1, title, content)
should_push = 0x1
else:
disabled = True
next = None
content = " \\r\\n任务已禁用"
await pushtool.pusher(user['id'], pushsw, 0x1, title, content)
should_push = 0x1

await self.db.tasklog.add(task['id'], success=False, msg=str(e), sql_session=sql_session)
await self.db.task.mod(task['id'],
Expand All @@ -305,8 +309,14 @@ async def do(self, task):

logger_worker.error('taskid:%d tplid:%d failed! %.4fs \r\n%s', task['id'], task['tplid'], time.perf_counter(
) - start, str(e).replace('\\r\\n', '\r\n'))
return False
return True

if should_push:
try:
pushtool = Pusher(self.db, sql_session=sql_session)
await pushtool.pusher(userid, pushsw, should_push, title, content)
except Exception as e:
logger_worker.error('taskid:%d push failed! %s', task['id'], str(e), exc_info=config.traceback_print)
return is_success


class QueueWorker(BaseWorker):
Expand Down

0 comments on commit d2db676

Please sign in to comment.