forked from fortable1999/pubsubdemo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
61 lines (54 loc) · 1.81 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from abstract import *
from lib.job import *
from lib.logging import *
from lib.queue import *
import json
import functools
class TaasWorker(
AMQPQueueMixin,
CouchDBJobStorageMixin,
SSHJobMixin,
CurlJobMixin,
BaseLoggingMixin,
AbstractTester
):
"""docstring for TaasWorker"""
async def process_output(self, job_id, data):
self.logger.info("job[%s] > %s" % (job_id, data))
job = await self.get_job(job_id)
if not "output" in job["tasks"]:
job["tasks"]["output"] = []
job["tasks"]["output"].append(data)
await self.put_job(job_id, job)
async def do_work(self, job_id):
"""
called For each job
"""
self.logger.info("> Job[%s] started" % job_id)
job = await self.get_job(job_id)
ssh_host = job['tasks']['ssh_host']
ssh_port = job['tasks']['ssh_port']
ssh_username = job['tasks']['ssh_username']
ssh_password = job['tasks']['ssh_password']
cmd_list = job['tasks']['script']
print(ssh_host, ssh_port, ssh_username, ssh_password)
await self.do_ssh_job(
ssh_host,
ssh_port,
ssh_username,
ssh_password,
cmd_list,
async_callback=functools.partial(self.process_output, job_id)
)
self.logger.info("> Job[%s] Finished" % job_id)
async def job_callback(self, body, envelope, properties, *args, **kwargs):
"""
Subscript to RabbmtMQ, create tasks and detach them.
"""
job_id = json.loads(body.decode('utf-8'))['id']
asyncio.get_event_loop().create_task(self.do_work(job_id))
def main():
worker = TaasWorker()
worker.run_forever()
if __name__ == '__main__':
main()