-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask.py
41 lines (38 loc) · 1.08 KB
/
task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# _*_ encoding=utf-8 _*_
import uuid
import threading
#基本任务对象
class Task:
def __init__(self,func,*args,**kwargs):
#任务的具体逻辑,通过函数引用传递进来
self.callable = func
self.id=uuid.uuid4()
self.args=args
self.kwargs=kwargs
def __str__(self):
return "Task id: " + str(self.id)
#异步任务对象
class AsyncTask(Task):
def __init__(self,func,*args,**kwargs):
self.result=None
self.condition=threading.Condition()
super().__init__(func,*args,**kwargs)
#设置运行结果
def set_result(self,result):
self.condition.acquire()
self.result=result
self.condition.notify()
self.condition.release()
#获取任务结果
def get_result(self):
self.condition.acquire()
if not self.result:
self.condition.wait()
result=self.result
self.condition.release()
return result
def my_function():
print("This is a task test")
if __name__=="__main__":
task=Task(func=my_function)
print(task)