How to Use Celery with Rabbitmq

This article explains , how to use celery with rabbitmq on ubuntu .

Hello all. today we will see how to use  celery with rabbitmq.

Prerequisite:

  • RabbitMQ (see my previous article here)

Step 1: Create virutalenv and install celery

 virtualenv envcelery
source envcelery/bin/activate
pip install celery

Step 2: Create Celery app


from __future__ import absolute_import
from celery import Celery

celery_url = "amqp://%s:%s@%s//" % ("guest", "guest", "0.0.0.0")

# create celery app named testapp with rabbimq as message broker
celery_app = Celery("testapp", broker=celery_url, backend='rpc://')

Note: You can also use redis  or mongodb as message broker. backend="rpc://" to store results , usually we don't need that option. Here for testing purpose i used rpc to store results.

Step 3: Create simple celery task to add two numbers


@celery_app.task()
def add(x,y):
    print "hi from task add", x, y
    return x+y

Our simple task gets value of x, y and returns added value.
Note:  @celery_app.task() decorator is important here. This identifies as a celery task

Step 4: Combining previous codes with simple python

save this as  testapp.py


from __future__ import absolute_import
from celery import Celery
import sys
import time

celery_url = "amqp://%s:%s@%s//" % ("guest", "guest", "0.0.0.0")

celery_app = Celery("testapp", broker=celery_url, backend='rpc://')


@celery_app.task()
def add(x,y):
   print "hi from task add", x, y
   return x+y


def main(x,y):
    """
       This is main function gets called initially while running
    """
   print "Let's add %s and %s" % (x, y)
   res = add.apply_async(args=[x, y])
   time.sleep(3)
   if res.ready():
        print "Result from task %s" % res.result


if __name__ == '__main__':
    if len(sys.argv) < 3:
        print "Usage: python testapp.py 1 2"
        sys.exit(0)
    x = int(sys.argv[1])
    y = int(sys.argv[2])
    main(x,y)

Step 5: How to Run our testapp

Open two tabs in your terminal and make sure you activated virtualenv which we created in step 1

source envcelery/bin/activate

celery -A testapp worker -l info

  • # celery is a keyword
  • # -A defines app testapp(where our celery_app resides)
  • # worker keryword to start worker for our testapp
  • # -l info denotes Log mode to info (-l debug -> to turn debug mode for celery)

if no error comes you will see something like this

output
-------------- celery@bala v4.0.2 (latentcall)
---- **** -----
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: testapp:0x7fde36532ad0
- ** ---------- .> transport: amqp://guest:**@0.0.0.0:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. testapp.add

[2017-03-31 11:51:03,189: INFO/MainProcess] Connected to amqp://guest:**@0.0.0.0:5672//
[2017-03-31 11:51:03,196: INFO/MainProcess] mingle: searching for neighbors
[2017-03-31 11:51:04,215: INFO/MainProcess] mingle: all alone
[2017-03-31 11:51:04,233: INFO/MainProcess] celery@bala ready.

In Second tab

python testapp.py 2 3


output:
Let's add 2 and 3
Result from task 5

if you see first tab where celery is running

2017-03-31 11:52:52,834: INFO/MainProcess] Received task: testapp.add[f366e36c-2d08-4ecd-a143-d755128c214e] 
[2017-03-31 11:52:54,247: WARNING/PoolWorker-2] hi from task add
[2017-03-31 11:52:54,247: WARNING/PoolWorker-2] 2
[2017-03-31 11:52:54,248: WARNING/PoolWorker-2] 3
[2017-03-31 11:52:54,268: INFO/PoolWorker-2] Task testapp.add[f366e36c-2d08-4ecd-a143-d755128c214e] succeeded in 0.021674826974s: 5

In general we don't need to wait for celery task response. I used  time.sleep here to get result from task and show.

That's it . Hope it helps.

Happy coding!!!

Leave a comment

(Note: Comments are moderated)