Django Asynchronous Task using Celery and RabbitMQ | Part 1

Posted on Jan 27, 2018 #python #django #celery

Celery is a powerful asynchronous task/job queue based on distributed message passing which is written in Python. It is used for running tasks in the background for both real-time operations as well as scheduled tasks. It’s easy to use and works great with Django. So, users don’t have to wait unnessarily for page loads or actions to complete because of some long running tasks that are blocking the workflow.

Download complete project from GitHub.

Celery Demo

Install RabbitMQ

Celery uses message broker to pass messages between Django and Celery workers. Celery supports several message brokers. Here, I’ll be using RabbitMQ which is the most widely used open source message broker. It is feature-complete, stable, durable and easy to install.

Another great advantage is that RabbitMQ runs on all major operating systems and supports a large number of developer platforms like Java, .NET, Python, PHP, Erlang and many more. Open a terminal and install RabbitMQ by executing this command:

sudo apt-get install rabbitmq-server

When the installation completes, the broker will already be running in the background. Check rabbitmq-server status:

sudo service rabbitmq-server status

If rabbitmq-server is not running:

sudo service rabbitmq-server start

Create a RabbitMQ user, a virtual host and allow that user access to that virtual host:

sudo rabbitmqctl add_user <myuser> <mypassword>
sudo rabbitmqctl add_vhost <myvhost>
sudo rabbitmqctl set_user_tags <myuser> <mytag>
sudo rabbitmqctl set_permissions -p <myvhost> <myuser> ".*" ".*" ".*"

Regarding other message brokers, more information can be found here.

Install Celery

Create a Django project mycelery inside virtual environment:

mycelery
├── manage.py
└── mycelery
    ├── __init__.py
    ├── settings.py
    ├── urls.py
    └── wsgi.py

Install Celery and django-celery-results extension:

pip install celery

RabbitMQ is the default broker so it doesn’t require any additional dependencies or initial configuration, other than the URL location of the broker instance we want to use. Inside mycelery/meycelery/settings.py file include:

CELERY_BROKER_URL = 'amqp://<myuser>:<mypassword>@localhost:5672/<myvhost>'
CELERY_RESULT_BACKEND = 'amqp://<myuser>:<mypassword>@localhost:5672/<myvhost>'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

To use Celery with our Django project, first define an instance of the Celery library, called an “app”. Create a new celery.py file inside mycelery/mycelery/ and write the following line of codes:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mycelery.settings')

app = Celery('mycelery')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Then import this app in our mycelery/mycelery/__init__.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

With this, Celery configuration is done. Now, we have to create a Celery task and run it asynchronously. I am going to create a new app where a user can create some random users depending on the input value. It’ll also display a simple progress bar as random users are being created in the background.

Create a Celery Task

Create a new Django app myapp, include it in INSTALLED_APPS and inside this app, create a tasks.py file:

from __future__ import absolute_import, unicode_literals
import string
from django.contrib.auth.models import User
from django.utils.crypto import get_random_string
from celery import shared_task, current_task

@shared_task
def create_random_user_accounts(total_user):
    for i in range(total_user):
        username = 'user_%s' % get_random_string(20, string.ascii_letters)
        email = '%s@example.com' % username
        password = get_random_string(50)
        User.objects.create_user(username=username, email=email, password=password)
        current_task.update_state(state='PROGRESS',
                                  meta={'current': i, 'total': total_user,
                                        'percent': int((float(i) / total_user) * 100)})
    return {'current': total_user, 'total': total_user, 'percent': 100}

The purpose of this task is to create random users base on given number. The @shared_task decorator allows creating tasks without having any concrete app instance.

Create Input Form

Inside myapp, create a forms.py file that that takes the total number of users that we want to create:

from django import forms
from django.core.validators import MinValueValidator

class GenerateRandomUserForm(forms.Form):
    total_user = forms.IntegerField(
        label='Number of users',
        required=True,
        validators=[
            MinValueValidator(10)
        ]
    )

Create View

In views.py file, create two functions - generate_random_user and get_task_info. Here, generate_random_user renders the form, validates it and get_task_info returns Celery task information.

import json
from django.shortcuts import render
from celery.result import AsyncResult
from django.http import HttpResponse
from myapp.forms import GenerateRandomUserForm
from myapp.tasks import create_random_user_accounts

def generate_random_user(request):
    if request.method == 'POST':
        form = GenerateRandomUserForm(request.POST)
        if form.is_valid():
            total_user = form.cleaned_data.get('total_user')
            task = create_random_user_accounts.delay(total_user)
            return HttpResponse(json.dumps({'task_id': task.id}), content_type='application/json')
        else:
            return HttpResponse(json.dumps({'task_id': None}), content_type='application/json')
    else:
        form = GenerateRandomUserForm
    return render(request, 'myapp/index.html', {'form': form})

def get_task_info(request):
    task_id = request.GET.get('task_id', None)
    if task_id is not None:
        task = AsyncResult(task_id)
        data = {
            'state': task.state,
            'result': task.result,
        }
        return HttpResponse(json.dumps(data), content_type='application/json')
    else:
        return HttpResponse('No job id given.')

The most important piece of code is create_random_user_accounts.delay() which invokes the Celery task that we created earlier. Here, .delay() is responsible for running this task asynchronously. Otherwise, it’ll just work as a normal function.

Create Template

To render the form from views and to take user input, create a simple HTML template index.html inside mycelery/myapp/templates/myapp/:

<html>
<head>
    <title>Celery Demo</title>
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.1.1/jquery.min.js"></script>
</head>

<body style="text-align: center;">
<h1>Generate Random Users</h1>
<progress id="progress-bar" value="0" max="100" style="display:none; margin-bottom: 1em;"></progress>
<form id="generate-user-form" action="/generate-user/" method="post">
    {{ "{% csrf_token" }} %}
    {{ "{{ form " }}}}
    <input type="submit" value="Submit"/>
</form>

<script type="text/javascript">
    var frm = $('#generate-user-form');
    var pgrbar = $('#progress-bar');
    frm.submit(function () {
        $.ajax({
            type: frm.attr('method'),
            url: frm.attr('action'),
            data: frm.serialize(),
            success: function (data) {
                if (data.task_id != null) {
                    get_task_info(data.task_id);
                }
            },
            error: function (data) {
                console.log("Something went wrong!");
            }
        });
        return false;
    });

    function get_task_info(task_id) {
        $.ajax({
            type: 'get',
            url: '/get-task-info/',
            data: {'task_id': task_id},
            success: function (data) {
                frm.html('');
                if (data.state == 'PENDING') {
                    frm.html('Please wait...');
                }
                else if (data.state == 'PROGRESS' || data.state == 'SUCCESS') {
                    pgrbar.css('display', 'inline');
                    pgrbar.val(data.result.percent);
                    frm.html('User created ' + data.result.current + ' out of ' + data.result.total);
                }
                if (data.state != 'SUCCESS') {
                    setTimeout(function () {
                        get_task_info(task_id)
                    }, 1000);
                }
            },
            error: function (data) {
                frm.html("Something went wrong!");
            }
        });
    }
</script>

</body>
</html>

Update URLs

Update mycelery/mycelery/urls.py to add the following urls:

from myapp.views import generate_random_user, get_task_info

urlpatterns = [
    ...
    path('generate-user/', generate_random_user),
    path('get-task-info/', get_task_info),
]

Run Django Server

Run migrations:

python manage.py migrate

If everything works fine, then the Django server will start:

python manage.py runserver

Start Celery Worker Process

Open another terminal. Go inside the project directory ../mycelery/mycelery. Don’t forget to activate virtual environment. To start a Celery worker instance, run the following command:

celery -A mycelery worker -l info

Open a browser and go to http://127.0.0.1:8000/generate-user. Give input to the number of users and the task will be performed asynchronously.

For further learning, visit Celery documentation.

comments powered by Disqus