Celerybeat Example

This article explains and gives you an example of celerybeat

Hello all. Today we will see about celerybeat .

Why celerybeat?

    Suppose we want to run some tasks periodically like every (hour, minute, second, once in a day/week/month). In those cases celerybeat will be useful. It will schedule those tasks and execute by worker.

Prerequisites:

  • RabbitMQ (See my previous article here)
  • Celery ( pip install celery)
  • Python2 

Hope you have rabbitmq and celery installed in your system.

What we gonna do?

   we are gonna create celeryapp and simple task which is going to executed every minute.

Step 1: Create Celery App

from __future__ import absolute_import
from celery import Celery
celery_url = "amqp://%s:%s@%s//" % ("guest", "guest", "0.0.0.0")
celery_app = Celery("celerybeatapp", broker=celery_url)

We imported celery package . Here  rabbbitmq as message broker. By default  guest is the username and also password for rabbitmq.

 celery_app = Celery("celerybeatapp", broker=celery_url)

celerybeatapp -> name of the celery app

Now we configured celery app.

Step 2: Create celery task

@celery_app.task()
def add(x,y):
    """
      This function is to be called every minute by celery beat
    """
   print "hi from task add", x, y
   return x+y

This function just adds x and y and returns it.

Step 3: Schedule Celery Beat to call add task every minute. 

we need to tell celery to schedule this task every minute to call . It's very simple by the below class config

    
class CeleryConfig:
      CELERYBEAT_SCHEDULE = {
                             'add-after-every-minute': {
                             'task': 'celerybeatapp.add', # task to be called
                             'schedule': timedelta(minutes=1), # minutes/second/hour
                             'args': (2,3) # arguments to calling function if any arguments to be passed
                             }
                            }
celery_app.config_from_object(CeleryConfig)
  • CELERYBEAT_SCHEDULE is a keyword necessary. 
  • add-after-every-minute -> name. Give any name here, make it reasonable for understanding
  • task -> path/to/task (for us inside same file so celerybeatapp.add)
  • (e.x)if task in tasks/worker.py import task and give like this tasks.worker.add
  • schedule: timedelta(minutes=1) (seconds/minute/hour) are other options
  • args -> if any arguments to be passed for calling task you can give in tuple
  • (i.e) args -> (2,3) for add(x, y)


Finally whole code will be and save this as  celerybeatapp.py

from __future__ import absolute_import
from celery import Celery
from datetime import timedelta

class CeleryConfig:
      CELERYBEAT_SCHEDULE = {
                             'add-after-every-minute': {
                             'task': 'celerybeatapp.add', # task to be called
                             'schedule': timedelta(minutes=1), # minutes/second/hour
                             'args': (2,3) # arguments to calling function if any arguments to be passed
                             }
                            }
celery_app.config_from_object(CeleryConfig)


celery_url = "amqp://%s:%s@%s//" % ("guest", "guest", "0.0.0.0")
celery_app = Celery("celerybeatapp", broker=celery_url)
celery_app.config_from_object(CeleryConfig)


@celery_app.task()
def add(x,y):
"""
This function is to be called every minute by celery beat
"""
print "hi from task add", x, y
return x+y

Conclusion: How to run

There are two ways to run

  1. One way (Separate schedule beat and Separate schedule worker for task)
  2. Second way (schedule beat and worker at the same time)

Schedule beat

    celery -A celerybeatapp beat # starts separate beat alone

start worker

  celery -A celerybeatapp worker -l info   # starts worker alone

  celery -A celerybeatapp worker -B -l info # starts worker and beats at the same


wait for one minute . you will see something like this

output
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery


[tasks]
. celerybeatapp.add

[2017-04-03 14:18:58,383: INFO/Beat] beat: Starting...
[2017-04-03 14:18:58,399: INFO/MainProcess] Connected to amqp://guest:**@0.0.0.0:5672//
[2017-04-03 14:18:58,409: INFO/MainProcess] mingle: searching for neighbors
[2017-04-03 14:18:58,414: INFO/Beat] Scheduler: Sending due task add-after-every-minute (celerybeatapp.add)
[2017-04-03 14:18:59,423: INFO/MainProcess] mingle: all alone
[2017-04-03 14:18:59,442: WARNING/MainProcess] celery@bala ready.
[2017-04-03 14:19:01,435: INFO/MainProcess] Received task: celerybeatapp.add[1d282e1a-db1d-4690-859c-d79f7867b0cd]
[2017-04-03 14:19:01,437: WARNING/Worker-3] hi from task add
[2017-04-03 14:19:01,438: WARNING/Worker-3] 2
[2017-04-03 14:19:01,438: WARNING/Worker-3] 3
[2017-04-03 14:19:01,439: INFO/MainProcess] Task celerybeatapp.add[1d282e1a-db1d-4690-859c-d79f7867b0cd] succeeded in 0.00221847400098s: 5

As you see, it schedules task every minute and executes it.

That's it. Thanks for reading. See you in my next post. Happy coding !!!

Leave a comment

(Note: Comments are moderated)