Skip to content

Commit

Permalink
Upgrade Celery from 4.x to 5.0.2
Browse files Browse the repository at this point in the history
 - Legacy-Id: 1062
  • Loading branch information
rpcross committed Nov 12, 2020
1 parent 822f6ad commit 10c7ca5
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 28 deletions.
4 changes: 1 addition & 3 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,8 @@ Celery is provided in the Python virtual environment. What remains is to copy
systemd files into place and enable/start the service.
INSTALL_DIR=/a/mailarch

cp INSTALL_DIR/current/backend/celery/celeryd.service /etc/systemd/system
cp INSTALL_DIR/current/backend/celery/celeryd.service /usr/lib/systemd/system
cp INSTALL_DIR/current/backend/celery/celeryd /etc/default/
mkdir /var/log/celery (wwwrun:www)
mkdir /var/run/celery (wwwrun:www)
systemctl enable celeryd
systemctl start celeryd

Expand Down
19 changes: 11 additions & 8 deletions backend/celery/celeryd.service
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ User=wwwrun
Group=www
EnvironmentFile=/etc/default/celeryd
WorkingDirectory=/a/mailarch/current
ExecStart=/bin/sh -c '${CELERY_BIN} multi start ${CELERYD_NODES} \
-A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \
--logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} \
--pidfile=${CELERYD_PID_FILE}'
ExecReload=/bin/sh -c '${CELERY_BIN} multi restart ${CELERYD_NODES} \
-A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \
--logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
RuntimeDirectory=celery
LogsDirectory=celery
ExecStart=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi start $CELERYD_NODES \
--pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \
--loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS'
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait $CELERYD_NODES \
--pidfile=${CELERYD_PID_FILE} --loglevel="${CELERYD_LOG_LEVEL}"'
ExecReload=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi restart $CELERYD_NODES \
--pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \
--loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS'
Restart=always

[Install]
WantedBy=multi-user.target
16 changes: 9 additions & 7 deletions backend/celery_haystack/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#import xapian

from celery.utils.log import get_task_logger
from django.apps import apps
from django.core.exceptions import ImproperlyConfigured
from django.core.management import call_command
Expand All @@ -26,8 +27,9 @@
if settings.CELERY_HAYSTACK_TRANSACTION_SAFE and not getattr(settings, 'CELERY_ALWAYS_EAGER', False):
from djcelery_transactions import PostTransactionTask as Task
else:
from celery.task import Task # noqa
from celery import Task # noqa

logger = get_task_logger(__name__)

class CeleryHaystackSignalHandler(Task):
using = settings.CELERY_HAYSTACK_DEFAULT_ALIAS
Expand All @@ -43,7 +45,6 @@ def split_identifier(self, identifier, **kwargs):
bits = identifier.split('.')

if len(bits) < 2:
logger = self.get_logger(**kwargs)
logger.error("Unable to parse object "
"identifer '%s'. Moving on..." % identifier)
return (None, None)
Expand Down Expand Up @@ -71,7 +72,6 @@ def get_instance(self, model_class, pk, **kwargs):
"""
Fetch the instance in a standarized way.
"""
logger = self.get_logger(**kwargs)
instance = None
try:
instance = model_class._default_manager.get(pk=int(pk))
Expand Down Expand Up @@ -111,8 +111,7 @@ def run(self, action, identifier, **kwargs):
Trigger the actual index handler depending on the
given action ('update' or 'delete').
"""
logger = self.get_logger(**kwargs)


# First get the object path and pk (e.g. ('notes.note', 23))
object_path, pk = self.split_identifier(identifier, **kwargs)
if object_path is None or pk is None:
Expand Down Expand Up @@ -172,7 +171,6 @@ class CeleryHaystackUpdateIndex(Task):
command from Celery.
"""
def run(self, apps=None, **kwargs):
logger = self.get_logger(**kwargs)
defaults = {
'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE,
'age': settings.CELERY_HAYSTACK_COMMAND_AGE,
Expand All @@ -198,7 +196,7 @@ class CeleryXapianBatchRemove(Task):
"""
def run(self, documents=None):
logger = self.get_logger()
database = xapian.WritableDatabase(settings.HAYSTACK_XAPIAN_PATH,xapian.DB_OPEN)
for document in documents:
database.delete_document('Q' + get_identifier(document))
Expand All @@ -207,6 +205,7 @@ def run(self, documents=None):
logger.info("Called BatchRemove ({} documents)".format(len(documents)))
'''


@app.task
def add(x, y):
return x + y
Expand All @@ -218,3 +217,6 @@ def update_mbox(files):
elist = EmailList.objects.get(pk=file[2])
create_mbox_file(file[0], file[1], elist)


CeleryHaystackSignalHandler = app.register_task(CeleryHaystackSignalHandler())
CeleryHaystackUpdateIndex = app.register_task(CeleryHaystackUpdateIndex())
32 changes: 31 additions & 1 deletion backend/celery_haystack/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from django.core.exceptions import ImproperlyConfigured
#from django.utils.importlib import import_module
from importlib import import_module
from django.db import connection, transaction

from haystack.utils import get_identifier

Expand All @@ -23,10 +23,40 @@ def get_update_task(task_path=None):
return Task


'''
def enqueue_task(action, instance):
"""
Common utility for enqueing a task for the given action and
model instance.
"""
identifier = get_identifier(instance)
get_update_task().delay(action, identifier)
'''

def enqueue_task(action, instance, **kwargs):
"""
Common utility for enqueing a task for the given action and
model instance.
"""
identifier = get_identifier(instance)
options = {}
if hasattr(settings, 'CELERY_HAYSTACK_QUEUE'):
options['queue'] = settings.CELERY_HAYSTACK_QUEUE
if hasattr(settings, 'CELERY_HAYSTACK_COUNTDOWN'):
options['countdown'] = settings.CELERY_HAYSTACK_COUNTDOWN

task = get_update_task()
task_func = lambda: task.apply_async((action, identifier), kwargs, **options)

if hasattr(transaction, 'on_commit'):
# Django 1.9 on_commit hook
transaction.on_commit(
task_func
)
elif hasattr(connection, 'on_commit'):
# Django-transaction-hooks
connection.on_commit(
task_func
)
else:
task_func()
2 changes: 1 addition & 1 deletion backend/mlarchive/archive/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,4 @@ def create_mbox_file(month, year, elist):
msg = email.message_from_binary_file(f)
mbox.add(msg)
mbox.close()

4 changes: 2 additions & 2 deletions backend/mlarchive/celeryapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mlarchive.settings')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mlarchive.settings.settings')

app = Celery('mlarchive')
app.config_from_object('django.conf:settings')
app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Expand Down
10 changes: 5 additions & 5 deletions backend/mlarchive/tests/archive/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for x in range(size))


class EmailListFactory(factory.DjangoModelFactory):
class EmailListFactory(factory.django.DjangoModelFactory):
class Meta:
model = EmailList

name = 'public'


class ThreadFactory(factory.DjangoModelFactory):
class ThreadFactory(factory.django.DjangoModelFactory):
class Meta:
model = Thread

date = datetime.datetime.now().replace(second=0, microsecond=0)


class MessageFactory(factory.DjangoModelFactory):
class MessageFactory(factory.django.DjangoModelFactory):
class Meta:
model = Message

Expand All @@ -41,7 +41,7 @@ class Meta:
# email_list = factory.SubFactory(EmailListFactory)


class AttachmentFactory(factory.DjangoModelFactory):
class AttachmentFactory(factory.django.DjangoModelFactory):
class Meta:
model = Attachment

Expand All @@ -51,7 +51,7 @@ class Meta:
sequence = factory.Sequence(lambda n: n + 1)


class UserFactory(factory.DjangoModelFactory):
class UserFactory(factory.django.DjangoModelFactory):
class Meta:
model = User

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
#bs4 # 4.1.3 was installed
beautifulsoup4
celery>=4.0,<5.0
celery==5.0.2
cloudflare>=2.1.0
cryptography
Django>=2.0,<3.0
Expand Down

0 comments on commit 10c7ca5

Please sign in to comment.