Django Periodic Task using Celery and RabbitMQ | Part 2

Posted on Feb 11, 2018 #python #django #celery

Sometimes, we need to execute tasks to run at a specific time every so often - i.e., getting data from an API at regular interval. Celery can be used for running these types of periodic tasks too. Celery uses Celery beat to schedule periodic tasks. It runs tasks at regular intervals, that are then executed by available Celery workers.

This is a quick example of how I got Periodic Tasks to work using Celery in Django. check Django Asynchronous Task using Celery and RabbitMQ - Part 1 to setup Celery with Django.

Download complete project from GitHub.

Set Time Zone

Open mycelery/mycelery/settings.py and add the following line:

CELERY_TIMEZONE = 'Asia/Dhaka'

If USE_TZ = True in settings.py, then set the corresponding CELERY_TIMEZONE. The periodic task scheduler uses the UTC time zone by default.

Create Periodic Tasks

Create the following tasks in mycelery/myapp/tasks.py:

@shared_task
def add(a, b):
    print(a + b)

@shared_task
def mul(a, b):
    print(a * b)

Basically, these are just Celery tasks. Now, we have to define these tasks as periodic tasks. Open mycelery/mycelery/settings.py and add the following lines:

from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'task-add': {
        'task': 'myapp.tasks.add',
        'schedule': 15,
        'args': (4, 5)
    },
    'task-mul': {
        'task': 'myapp.tasks.mul',
        'schedule': crontab(minute='*/2'),
        'args': (4, 5)
    },
}

Here the CELERY_BEAT_SCHEDULE defines our periodic tasks. It tells that first task is scheduled to run every 15 seconds and the second task is scheduled to run every 2 minutes.

The schedule can be the number of seconds as an integer, a timedelta, or a crontab. For more control over when the task is executed, for example, a particular time of day or day of the week, the crontab schedule type is used. Example of Crontab schedules can be found here.

Both our tasks take two positional arguments. That’s why, args is used to pass the values, otherwise remove args. For more information, read this on available fields.

Another Way

Also, there is another way to create periodic tasks without defining CELERY_BEAT_SCHEDULE in settings.py file. Open mycelery/myapp/tasks.py and create the following periodic task:

from celery.schedules import crontab
from celery.task import periodic_task

@periodic_task(run_every=(crontab(minute='*/1')), name="task-hello")
def print_hello():
    print('Hello World!')

Here, I have used periodic_task decorator instead of shared_task. This task is scheduled to run every 1 minutes. But the problem of using periodic_task decorator is that we cannot pass positional arguments to the task. So, use periodic_task decorator if the task does not take any arguments.

Start the Worker and the Beat

Celery requires both of the worker and the beat in order for tasks to execute as planned. When developing, use this command:

celery -A mycelery worker -l info -B

It will start both simultaneously. But, this method is discouraged for a production environment. For the production environment, each of the worker and the beat service should be started separately by the following commands:

celery -A mycelery worker -l info
celery -A mycelery beat -l info

Daemonization

Daemonization makes the services start automatically along with the system. For this, I will use Supervisor. Check Django + Gunicorn + Supervisor + Nginx to daemonization any Django application.

Create a Supervisor configuration file for Celery worker in /etc/supervisor/conf.d/ directory and name it celeryworker.conf:

[program:celeryworker]

; Set full path to celery program if using virtualenv
command=<project virtualenv directory>/bin/celery worker -A mycelery --loglevel=INFO

; The directory to your Django project
directory=<project directory>

; If supervisord is run as the root user, switch users to this UNIX user account
; before doing any processing.
user=<user>

; Supervisor will start as many instances of this program as named by numprocs
numprocs=1

; Put process stdout output in this file
stdout_logfile=<project directory>/log/celeryworker-supervisor.log

; Put process stderr output in this file
stderr_logfile=<project directory>/log/celeryworker-supervisor.log

; If true, this program will start automatically when supervisord is started
autostart=true

; May be one of false, unexpected, or true. If false, the process will never
; be autorestarted. If unexpected, the process will be restart when the program
; exits with an exit code that is not one of the exit codes associated with this
; process’ configuration (see exitcodes). If true, the process will be
; unconditionally restarted when it exits, without regard to its exit code.
autorestart=true

; The total number of seconds which the program needs to stay running after
; a startup to consider the start successful.
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if your broker is supervised, set its priority higher
; so it starts first
priority=998

Create another Supervisor configuration file for Celery beat in /etc/supervisor/conf.d/ directory and name it celerybeat.conf:

[program:celerybeat]

; Set full path to celery program if using virtualenv
command=<project virtualenv directory>/bin/celery beat -A mycelery --loglevel=INFO

; The directory to your Django project
directory=<project directory>

; If supervisord is run as the root user, switch users to this UNIX user account
; before doing any processing.
user=<user>

; Supervisor will start as many instances of this program as named by numprocs
numprocs=1

; Put process stdout output in this file
stdout_logfile=<project directory>/log/celerybeat-supervisor.log

; Put process stderr output in this file
stderr_logfile=<project directory>/log/celerybeat-supervisor.log

; If true, this program will start automatically when supervisord is started
autostart=true

; May be one of false, unexpected, or true. If false, the process will never
; be autorestarted. If unexpected, the process will be restart when the program
; exits with an exit code that is not one of the exit codes associated with this
; process’ configuration (see exitcodes). If true, the process will be
; unconditionally restarted when it exits, without regard to its exit code.
autorestart=true

; The total number of seconds which the program needs to stay running after
; a startup to consider the start successful.
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if your broker is supervised, set its priority higher
; so it starts first
priority=999

Finally, run the following commands in terminal to make Supervisor aware of these two programs:

sudo supervisorctl reread
sudo supervisorctl update

Run the following commands to stop, start, and/or check the status of the celeryworker program:

sudo supervisorctl stop celeryworker
sudo supervisorctl start celeryworker
sudo supervisorctl status celeryworker

For further learning about periodic tasks, visit Celery periodic task documentation.

comments powered by Disqus