Celery with different types of Queues in rabbitmq

This article explains , different types of queues in rabbitmq and example with celery .

Hello all . Today we will see how to create our own queues in celery with rabbitmq. We will see different type of queues in rabbitmq.

Installation

   sudo apt-get install python-celery

Prerequisites:

  • Rabbitmq (see my previous article here )
  • Celery ( pip install celery)

What we are gonna do ?

  • Create two queues one for addition, one for subtraction.
  • Handle routing of queues
  • Get results from queue

Things we should know

  • Producer
  • Consumer
  • Queue
  • Exchange

Producer -> who puts task into queue

Consumer -> who gets task from queue

Queue -> place or bucket where our tasks are stuffed into

Exchange -> who is responsible for which task into which queue

Note: Producer doesn't aware of queues, it just sends task to exchange, exchange knows which queue has to receive task

Let's do coding !!!

Here we are creating celery app named  testapp

Step 1: Create Celery app


from __future__ import absolute_import
from celery import Celery

celery_url = "amqp://%s:%s@%s//" % ("guest", "guest", "0.0.0.0")

celery_app = Celery("testapp", broker=celery_url, backend='rpc://')

Here i just imported celery package and created app with rabbitmq as message broker.

 guest -> default username and password for rabbitmq.

 backend='rpc://' -> rabbitmq as result backend too // optional

I need backend option for example purpose for this post

Step 2: Create Our Own Queues


from kombu import Queue, Exchange

class CeleryConfig:

CELERY_QUEUES = (
[Queue('default', Exchange('default'), routing_key='default'),
Queue('addqueue', Exchange('add'), routing_key='add'),
Queue('subqueue', Exchange('sub'), routing_key='sub'),]
)

CELERY_ROUTES = {
'testapp.add': {'queue': 'addqueue', 'routing_key': 'add', 'exchange': 'add'},
'testapp.sub': {'queue': 'subqueue', 'routing_key': 'sub', 'exchange': 'sub'}
}

celery_app.config_from_object(CeleryConfig)

  • CELERY_QUEUES -> list of queues
  • Queue('addqueue', Exchange('add'), routing_key='add'
  • addqueue -> queue name you can give any name
  • Exchange('add') -> creating an exchange with name add
  • routing_key='add' -> routing key is very important to route tasks into proper queue
  • CELERY_ROUTES -> we created our routes there
  • testapp.add, testapp.sub -> add, sub are functions which we are gonna write next


Step 3: Create  add and sub celery task

@celery_app.task()
def add(x,y):
print "hi from task add", x, y
return x+y

@celery_app.task()
def sub(x,y):
print "hi from task sub", x, y
return x-y
  • add(x,y) -> returns addition of x,y values.
  • sub(x,y) -> returns subraction of x,y values.
  • @celery_app.task() -> denotes its celery task

So our final code will be

Final code


from __future__ import absolute_import
from celery import Celery
import sys
import time
from kombu import Queue, Exchange


celery_url = "amqp://%s:%s@%s//" % ("guest", "guest", "0.0.0.0")

celery_app = Celery("testapp", broker=celery_url,
backend='rpc://')

class CeleryConfig:

CELERY_QUEUES = (
[
Queue('addqueue', Exchange('add'), routing_key='add'),
Queue('subqueue', Exchange('sub'), routing_key='sub'),]
)

CELERY_ROUTES = {
'testapp.add': {'queue': 'addqueue', 'routing_key': 'add', 'exchange': 'add'},
'testapp.sub': {'queue': 'subqueue', 'routing_key': 'sub', 'exchange': 'sub'}
}

celery_app.config_from_object(CeleryConfig)


@celery_app.task()
def add(x,y):
print "hi from task add", x, y
return x+y

@celery_app.task()
def sub(x,y):
print "hi from task sub", x, y
return x-y


def main(x,y):
"""
This is main function gets called initially while running
"""
print "Let's add %s and %s" % (x, y)
res = add.apply_async(args=[x, y],routing_key='add')
time.sleep(3)
if res.ready():
print "Result from task %s" % res.result

print "Let's sub %s and %s" % (x, y)
res = sub.apply_async(args=[x, y],routing_key='sub')
time.sleep(3)
if res.ready():
print "Result from task %s" % res.result


if __name__ == '__main__':
if len(sys.argv) < 3:
print "Usage: python testapp.py 1 2"
sys.exit(0)
x = int(sys.argv[1])
y = int(sys.argv[2])
main(x,y)

How to Run testapp

Open two tabs in your terminal

In Tab One in terminal, Run Celery by


celery -A testapp worker -l info -Q add,sub

  • celery is a keyword
  • -A defines app testapp(where our celery_app resides)
  • worker keryword to start worker for our testapp
  • -l info denotes Log mode to info (-l debug -> to turn debug mode for celery)
  • -Q add, sub  denotes listen for add and sub queues alone

if no error comes you will see something like this

output
-------------- celery@bala v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-83-generic-x86_64-with-Ubuntu-14.04-trusty 2017-07-19 21:46:56
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: testapp:0x7f556e10f9d0
- ** ---------- .> transport: amqp://guest:**@0.0.0.0:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> add exchange=add(direct) key=add
.> sub exchange=sub(direct) key=sub

[tasks]
. testapp.add
. testapp.sub

[2017-07-19 21:46:56,351: INFO/MainProcess] Connected to amqp://guest:**@0.0.0.0:5672//
[2017-07-19 21:46:56,359: INFO/MainProcess] mingle: searching for neighbors
[2017-07-19 21:46:57,378: INFO/MainProcess] mingle: all alone
[2017-07-19 21:46:57,395: INFO/MainProcess] celery@bala ready.

In Second tab, Run Python script by 

python testapp.py 3 2

output

Let's add 3 and 2
Result from task 5
Let's sub 3 and 2
Result from task 1

if you see first tab where celery is running

[2017-07-19 21:48:14,528: INFO/MainProcess] Received task: testapp.add[c964db8c-c81d-4a5d-b622-554615882091] 
[2017-07-19 21:48:15,454: WARNING/PoolWorker-1] hi from task add
[2017-07-19 21:48:15,455: WARNING/PoolWorker-1] 3
[2017-07-19 21:48:15,455: WARNING/PoolWorker-1] 2
[2017-07-19 21:48:15,472: INFO/PoolWorker-1] Task testapp.add[c964db8c-c81d-4a5d-b622-554615882091] succeeded in 0.0182367389789s: 5
[2017-07-19 21:48:17,536: INFO/MainProcess] Received task: testapp.sub[598dd4d8-3778-4296-b047-c51495c04b6a]
[2017-07-19 21:48:19,457: WARNING/PoolWorker-2] hi from task sub
[2017-07-19 21:48:19,457: WARNING/PoolWorker-2] 3
[2017-07-19 21:48:19,458: WARNING/PoolWorker-2] 2
[2017-07-19 21:48:19,475: INFO/PoolWorker-2] Task testapp.sub[598dd4d8-3778-4296-b047-c51495c04b6a] succeeded in 0.0179580140393s: 1

Note: In general we don't need to wait for celery task response. I used  time.sleep here to get result from task and show.

That's it . Hope it helps.

Happy coding!!!

Leave a comment

(Note: Comments are moderated)