-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
45 lines (34 loc) · 1.57 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
from celery import Celery
from workers.send_email import send_email
from workers.gee import task_index_pasture
from app.models.payload import ResultPayload
from celery.utils.log import get_task_logger
from workers.models.email import SendEmail
logger = get_task_logger(__name__)
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://quees:6379/0")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://queejobs:6379/0")
@celery.task(name="Index Pasture GEE",bind=True)
def gee_get_index_pasture(self, payload: ResultPayload):
result = task_index_pasture(self.request.id, payload)
payload_send_mail ={
"receiver_email": payload['user']['email'],
"subject": "CONCLUÍDA - Requisição de Análise de Geometria (Área)",
"message":{
"template":"base.html",
"content":{
"title": "CONCLUÍDA - Requisição de Análise de Geometria (Área)",
"hello": f"Olá, {payload['user']['name']}",
"body": "Conforme solicitação, a análise de dados da área submetida está completa. Você consegue acessar os resultados no link abaixo:",
"url":f"https://atlasdaspastagens.ufg.br/results/{self.request.id}",
"regards":f"Atenciosamente",
"team":"Equipe do Atlas das Pastagens do Brasil"
}
}}
task_send_email.delay(payload_send_mail)
return result
@celery.task(name="Send Email")
def task_send_email(payload: SendEmail):
logger.info(f"Sending email: ")
return send_email(payload)