Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dramatiq workers hangs very often, no logs were seen in logs even if it is verbose mode. #578

Open
7 tasks done
Harinib-Kore opened this issue Sep 13, 2023 · 12 comments
Open
7 tasks done

Comments

@Harinib-Kore
Copy link

Harinib-Kore commented Sep 13, 2023

Issues

GitHub issues are for bugs. If you have questions, please ask them on the mailing list.

Checklist

  • Does your title concisely summarize the problem?
  • Did you include a minimal, reproducible example?
  • What OS are you using?
  • What version of Dramatiq are you using?
  • What did you do?
  • What did you expect would happen?
  • What happened?

What OS are you using?

Centos

What version of Dramatiq are you using?

Dramatiq version 1.14.2

What did you do?

Processing parallel async requests using dramatiq

What did you expect would happen?

It should work properly it should receive messages and tasks, but it is getting hanged.

What happened?

We are processing parallel async requests. It will work fine for 3 days and again the same issue occurs. If we restart the workers then it works , how to fix this

dramatiq dramatiq_app.BatchOperationsDramatiqTasks --processes 1 --verbose

I cant see anything in logs files. Even logs getting hanged

error which i captured when logs got hanged

Exception in thread Thread-1: Traceback (most recent call last): File "/data/py3.9.7/lib/python3.9/[threading.py](https://threading.py/)", line 973, in _bootstrap_inner [self.run](https://self.run/)() File "/data/py3.9.7/lib/python3.9/[threading.py](https://threading.py/)", line 910, in run self._target(*self._args, **self._kwargs) File "/data/py3.9.7/Findly/lib/python3.9/site-packages/dramatiq/[cli.py](https://cli.py/)", line 328, in watch_logs data = event.recv_bytes() File "/data/py3.9.7/lib/python3.9/multiprocessing/[connection.py](https://connection.py/)", line 221, in recv_bytes buf = self._recv_bytes(maxlength) File "/data/py3.9.7/lib/python3.9/multiprocessing/[connection.py](https://connection.py/)", line 426, in _recv_bytes return self._recv(size) File "/data/py3.9.7/lib/python3.9/multiprocessing/[connection.py](https://connection.py/)", line 384, in _recv chunk = read(handle, remaining) MemoryError

@Harinib-Kore Harinib-Kore changed the title Dramatiq workers hangs after very often, no logs were seen in logs even if it is verbose mode. Dramatiq workers hangs very often, no logs were seen in logs even if it is verbose mode. Sep 13, 2023
@spumer
Copy link

spumer commented Sep 22, 2023

We got something similiar with --process 1 --threads 2. When Worker consume one queue and produce messages to another.
We reproduce it yesterday and will investigate what happend actually

Quick fix was --process 1 --threads 1
UPD: we produce messages to another queue trough kombu and kombu.Connection not thread-safe. In that case you got infinity wait on _read, cause bot threads call _read and read different parts of incoming data and both can't construct frame and infinity wait for next data.

@Alsheh
Copy link

Alsheh commented Oct 16, 2023

I've observed similar behavior when the CPU reaches 100%.

@menezes-
Copy link

menezes- commented Oct 27, 2023

I have the same issue with 1.15 (python 3.11) using the AsyncIO middleware.
using --process 1 --threads 1 works

@agamrp
Copy link

agamrp commented Nov 10, 2023

I am seeing this as well running with --processes 4 --threads 1 running through django_dramatiq
I'll try updating to 1 process and see how that goes. Also using dramatiq 1.5 (python 3.10)

@spumer
Copy link

spumer commented Jun 23, 2024

I'm released thread-safe broker implemention https://github.com/spumer/dramatiq-kombu-broker/
We does not have hangs anymore with that

@Harinib-Kore
Copy link
Author

I'm released thread-safe broker implemention https://github.com/spumer/dramatiq-kombu-broker/ We does not have hangs anymore with that

hey what combination work wells whether it is dramatiq kombu broker and redis as backend ,
or should i use both kombu

@spumer
Copy link

spumer commented Jul 3, 2024

Works well with kombu + rabbitmq.
Right now dramatiq-kombu-broker support only RabbitMQ

@Harinib-Kore
Copy link
Author

Is there a way to check the current status of a Dramatiq task by its jobId? Specifically, to determine if it is still running or has completed? Also, is it possible to assign a custom name to the job, similar to a functionality available in Celery?

In Celery, you can achieve this with the following code:

celery_task = celery.AsyncResult(id=job_id, app=celery)
if celery_task.state != 'REVOKED':

My primary intention is to manage long-running tasks by being able to terminate them if necessary. How can I remove such tasks from the queue in Dramatiq?

@spumer
Copy link

spumer commented Jul 4, 2024

Is there a way to check the current status of a Dramatiq task by its jobId?

We solve it by https://github.com/Bogdanp/django_dramatiq, it has Task model and middleware which write message status

Also, is it possible to assign a custom name to the job, similar to a functionality available in Celery?

You mean display_name in Celery? Where you expect to see that in Dramatiq? :) I think Actor name is enough

@Harinib-Kore
Copy link
Author

Harinib-Kore commented Jul 4, 2024

Is there a way to check the current status of a Dramatiq task by its jobId?

We solve it by https://github.com/Bogdanp/django_dramatiq, it has Task model and middleware which write message status

Also, is it possible to assign a custom name to the job, similar to a functionality available in Celery?

You mean display_name in Celery? Where you expect to see that in Dramatiq? :) I think Actor name is enough

  1. how to get process id of task with message id or any other parameter in dramatiq ?

  2. I wanted to know how to revoke a process like

result.revoke()

AsyncResult(id).revoke()

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
terminate=True)

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
terminate=True, signal='SIGKILL')

@spumer
Copy link

spumer commented Jul 8, 2024

You need to implement manually. Dramatiq has no functionality to revoke task.

I think it can be done by implement like in celery: create queue for events and dispatch events by worker when done some action and listen for any other. Then you can implement "revoke event", and discard message when worker receive it

@Harinib-Kore
Copy link
Author

Harinib-Kore commented Jul 30, 2024

Hi i am facing issue , can you tell me how to solve this @agamrp @spumer @Bogdanp

I'm issuing a ticket for the following Dramatiq configuration and error:

Actor Configuration

@dramatiq.actor(queue_name="batch_index_operations_queue", store_results=True)
def batch_index(request_payload):
    try:
        sa_gateway.batch_index_documents(request_payload)
        return True
    except Exception:
        debug_logger.error('Error in creating an async task for Batch Index')
        debug_logger.error(traceback.format_exc())
        return False

Dramatiq Setup

def setup_dramatiq():
    redis_url = get_redis_connection_url()
    redis_broker = RedisBroker(url=redis_url)
    dramatiq.set_broker(redis_broker)
    backend = RedisBackend(url=redis_url)
    redis_broker.add_middleware(Results(backend=backend))

CrawlerDramatiq Configuration

{
"name": "CrawlerDramatiq",
"script": "/bin/bash",
"args": "-c 'cd /data/www/Findly/search && source /data/sa_py3.9.7/Findly/bin/activate && dramatiq dramatiq_app.CrawlDramatiqTasks --processes 4'",
"autorestart": true,
"log_file": "/data/logs/findly/crawler_dramatiq.log",
"watch": false
}

Error

Exception in thread Thread-1:
Traceback (most recent call last):
File "/data/sa_py3.9.7/lib/python3.9/threading.py", line 973, in _bootstrap_inner
self.run()
File "/data/sa_py3.9.7/lib/python3.9/threading.py", line 910, in run
self._target(*self._args, **self._kwargs)
File "/data/sa_py3.9.7/Findly/lib/python3.9/site-packages/dramatiq/cli.py", line 337, in watch_logs
log_file.write(data + "\n")
MemoryError

Issue

I am encountering a MemoryError when running the Dramatiq task. How can this be resolved?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants