Contents

Distributed Python: Celery

Nowadays, SOA architectures is in fashion. These architectures use little and very specific services, so they interact each other.

In this post I’ll show how to use Celery in order to create a SOA architecture.

Python

About Celery

Celery is not a communication system, because it uses RabbitMQ, Redis, etc. as communication system. The same applies to argue it is not a message queue system. Neither it is a protocol, because it uses AMQP. Neither it is an abstraction over all these, because Kombu does that work, the communications library it uses under the hood.

Celery is a set ot tools to work easily with several services, something like syntactic sugar. It allows to launch services as tasks.

How to install it

It is very easy to install; with pip:

1
pip install celery

or apt:

1
apt-get install python-celery

The problem here is we need a Broker. The Broker is the way to transport messages from a service to another. In this case, we need a message queue.

We will use two of them as example: RabbitMQ and Redis. The first one is somehow… complicated, but very interesting if you already know and install the management plugin. The second one is easier to be installed, but more difficult to see what is happening internally. To use Redis you will also require the library; with pip:

1
pip install redis

or with apt:

1
apt-get install python-redis

Example

Creating services

Let’s start creating a little service to multiply two numbers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Celery service example: task to multiply two numbers
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

The overhead added by Celery is quite small: importing the library, connecting (app) and adding a decorator to our service.

It is already executable:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ celery worker --loglevel=info -A tasks

 -------------- celery@nightcrawler v3.1.17 (Cipater)
---- **** -----
--- * ***  * -- Linux-3.16.0-4-amd64-x86_64-with-debian-8.0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f3476eb9b90
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.multiply

[2015-03-20 21:56:16,526: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-03-20 21:56:16,541: INFO/MainProcess] mingle: searching for neighbors
[2015-03-20 21:56:17,780: INFO/MainProcess] mingle: all alone
[2015-03-20 21:56:17,791: WARNING/MainProcess] celery@nightcrawler ready.

There is a lot of information here:

  • Used versions
  • Created queues
  • Brokers used to transport and retrieve results
  • Number of workers, that is, processes available to process a request in a concurrent way.

And more, but it can be ignored right now.

We will leave it running.

Running services

That was the subscriptor. Let create the publisher.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Celery client example: request for two numbers multiplication
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

promise = app.send_task('tasks.multiply', args=[2, 2])

print(promise.get())

This is more complicated… but quick to be explained:

  • First part is the same than before, because we need to conect to the same server.
  • Second part runs the task with its name, passing the arguments. That returns a promise. At this point we have leaved a message in the queue.
  • Finally, the promise is resolved with promise.get(). Whenever, the service had read and processed the message, and it has leaved the result in other queue and, with this method, it is read.

So it seems complicated… but it could be even more.

Anyways, I’ve saved a video to see it all:

Everything together is easier

You can put it all together in the same file, and everything become easier. This is not always possible and can give problems (like every change implies to reboot both client and service), but is very educational:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# Celery full example: publisher/subscriber to request a multiplication
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

def main():
    promise = multiply.delay(4, 5)
    print(promise.get())

if __name__ == '__main__':
    main()

Now we are running a task method directly, so we get the task.

And now we have a very complicated way to multiply two numbers :)

Tools

As I said before, Celery gives you tools and syntactic sugar.

First of all, we’ll see partials. They are functions with some parameters already set.

It is very easy to create the partial duplicate by forcing the first parameter of multiply:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Celery full example: multiply two numbers with partials
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

def main():
    duplicate = multiply.si(2)
    promise = duplicate.delay(5)
    print(promise.get())

if __name__ == '__main__':
    main()
:start-line: 12 :end-line: 16 :number-lines: 13

Yeah, partials are built with method .si, despite we can use .s too. The fist one is immutable (that explains the i). It is very difficult to explain this before explaining another Celery tool: Chains.

Celery allows to chain tasks, so the result of one is the first parameter of the next one. Why the first one? It works so. It is a fucking shit, but works so.

Sometimes we will require chained tasks just to set an order, so we can ignore the previous result. Here is when we can use the immutable functions, because they won’t use the previous task result.

I suppose you have noticed I use “functions” or “services” equally.

But too much talk; let’s concat something:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Celery example: multiply with chains
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

from celery import chain

def main():
    duplicate = multiply.s(2)

    task = chain(multiply.s(4, 5), multiply.s(2))
    promise = task.delay()
    print(promise.get())

if __name__ == '__main__':
    main()

As I said before, Celery gives syntactic sugar, so it can be rewritten to:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# Celery example: multiply with chains using pipes (syntactic sugar)
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

def main():
    duplicate = multiply.s(2)

    task = multiply.s(4, 5) | multiply.s(2)
    promise = task.delay()
    print(promise.get())

if __name__ == '__main__':
    main()

And here is where fun starts: we have a canvas, that is, a task workflow. Believe ir or not, if you launch several workers in different hosts and all of them are connected to the same broker, each operation could run in a different host.

But… Why the canvas if all the operations run sequentially? Because we can process them concurrently with Groups:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Celery example: several multiplications with chains and groups (canvas)
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

from celery import group

def main():
    duplicate = multiply.s(2)

    task = group(
        multiply.s(4, 5) | multiply.s(2),
        multiply.s(9, 2) | multiply.s(4),
        multiply.s(9, 7) | duplicate,
    )
    promise = task.delay()
    print(promise.get())

if __name__ == '__main__':
    main()

There is even more syntactic sugar, like chords, maps, starmaps and chunks, but we won’t browse them. This is an introductory post. You can read more about Canvas.

The Beat

Other important tool in Celery is the Beat. It is a beat or a periodic signal. We can start it in the worker with --beat option. While Beat exclusiveness is not added, it is important to run just one or we will receive more than one beat.

This beat allows us to run periodic tasks.

Let’s see a small example: we are going to multiply the hour by 2 every 10 seconds:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Celery beat example: periodic date operation
import datetime
from celery import Celery
from celery import group

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add_days(days):
    return datetime.datetime.now() + datetime.timedelta(days=days)

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'multiply-each-10-seconds': {
            'task': 'tasks.add_days',
            'schedule': datetime.timedelta(seconds=10),
            'args': (2, )
        },
    },
)

And here you have the video:

Again, I’m going to stop here; I can only show a little about it. You can read more about periodic tasks if you are interested in.

Warning: pickle

Perhaps you see a warning like warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)). This is because we are using the pickle serializer, which is deprecated. You can avoid it by adding:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Celery beat example: periodic date operation
import datetime
from celery import Celery
from celery import group

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add_days(days):
    return datetime.datetime.now() + datetime.timedelta(days=days)

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'multiply-each-10-seconds': {
            'task': 'tasks.add_days',
            'schedule': datetime.timedelta(seconds=10),
            'args': (2, )
        },
    },
    CELERY_ACCEPT_CONTENT = ['json'],
)

Complex environments

Obviously, I’ve talked just about basic Celery. We’ve used just a queue to communicate. As long as RabbitMQ allows queue operations, Celery too.

You can:

  • assign different queues to perform different tasks, so not all workers listen all requests or to assign different workers to different queues.
  • route messages between queues.
  • send messages to all queues (topic).

I’m not going to explain it deeply, but I’m going to describe the message path. This information is required to understand how routing works (found at Exchanges, queues and routing keys):

  1. Message is sent. It is leaved in an EXCHANGE. If it doesn’t exist, Celery will create it.
  2. The EXCHANGE routes the message to one or more queues, depending on its configuration. If queues doesn’t exist, Celery will care.
  3. The message waits in the queue until a consumer takeExchanges it. At this moment, the message is blocked to avoid other consumers to take it.
  4. After processing it, consumer sends an ACK and the message is finally removed.

By default, Celery creates queues with the exchange name and binds them in a direct way, that is, every message that gets the exchange is routed to that queue.

You can play with routings and create really complex architectures, so any message arrives to its target depending on the operation.

More information

The post AMQP, RabbitMQ and Celery - A Visual Guide For Dummies has beginner level, but I like it a lot because it explains everything with images. How To Use Celery with RabbitMQ to Queue Tasks on an Ubuntu VPS is good too, and the PyCon slides Celery for Internal API in SOA infrastructure too.

Getting Started Scheduling Tasks with Celery is about configuring periodic tasks in a dynamic way by using DJCelery, quite interesting to use it with Django.

In order to understand it under the hood, I recommend AMQP 0-9-1 Model Explained, where AMQP protocol is explained. It is very interesing to try complex routings. AMQP in 10 mins : Part3 – Flexible Routing Model talks about that too, but is easier and just explains the basic concepts.

If you want to participate in a project using Celery, I can cheer you to help me with DJCron, a wrapper over DJCelery that allows to configure distributed tasks dinamically, adding some extra features over DJCelery.