Contenido

Python distribuido: Celery

Ahora están de modas las arquitecturas SOA . Estas arquitecturas consisten en pequeños servicios muy específicos, de manera que interactúan unos con otros.

En esta ocasión voy a contar cómo utilizar Celery para crear una arquitectura SOA.

Python

Qué es Celery

No puede decirse que sea un sistema de comunicaciones, ya que el sistema de comunicaciones es RabbitMQ, Redis, etc. Como usa uno de ellos, no puede ser un sistema de colas de mensajes. Tampoco es un protocolo, ya que utiliza AMQP. Tampoco es una abstracción sobre éstos, ya que eso es Kombu, la librería de comunicaciones que utiliza.

Celery es un conjunto de herramientas que nos permite trabajar fácilmente con múltiples servicios, con algo de azúcar sintáctico. Es una forma de lanzar servicios viéndolos como tareas.

Cómo instalarlo

Instalarlo es sencillo; con pip:

1
pip install celery

o con apt:

1
apt-get install python-celery

El problema es que necesitamos un Broker. El Broker es el medio utilizado para transportar los mensajes de un servicio a otro. En este caso, necesitamos una cola de mensajes.

Para los ejemplos usaremos dos: RabbitMQ y Redis. La primera es algo… complicadilla, pero muy visual si ya lo conocéis e instaláis el plugin de gestión. La segunda es más sencilla de instalar, pero más difícil de ver las tripas. Para Redis necesitaréis también la librería; con pip:

1
pip install redis

o con apt:

1
apt-get install python-redis

Ejemplo

Creando servicios

Vamos a comenzar con la creación de un pequeño servicio que multiplica dos números:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Celery service example: task to multiply two numbers
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

Como veis, el overhead de añadir Celery es mínimo: importar la librería, crear la conexión (app) y añadir un decorador a nuestro servicio.

Ya es ejecutable:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ celery worker --loglevel=info -A tasks

 -------------- celery@nightcrawler v3.1.17 (Cipater)
---- **** -----
--- * ***  * -- Linux-3.16.0-4-amd64-x86_64-with-debian-8.0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f3476eb9b90
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.multiply

[2015-03-20 21:56:16,526: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-03-20 21:56:16,541: INFO/MainProcess] mingle: searching for neighbors
[2015-03-20 21:56:17,780: INFO/MainProcess] mingle: all alone
[2015-03-20 21:56:17,791: WARNING/MainProcess] celery@nightcrawler ready.

Aquí hay mucha información:

  • versiones utilizadas.
  • colas creadas.
  • brokers usados para transporte y para resultados.
  • Número de workers, es decir, procesos dispuestos a responder una petición de forma concurrente.

Y alguna cosa más que podemos ignorar por el momento.

Dejaremos eso ahí corriendo de momento.

Lanzando servicios

Ése es el subscriptor. Vamos a crear ahora el publicador.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Celery client example: request for two numbers multiplication
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

promise = app.send_task('tasks.multiply', args=[2, 2])

print(promise.get())

Un poco complicado… Pero vamos a verlo rápidamente:

  • La primera parte es igual que en el primer ejemplo, ya que necesitamos conectarnos al mismo sitio.
  • La segunda parte ejecuta la tarea por su nombre, pasándole los argumentos. Eso devuelve una promesa (promise). En este punto hemos dejado un mensaje en la cola.
  • Finalmente, materializamos la promesa mediante promise.get(). En algún momento el servicio habrá leído el mensaje, lo habrá procesado, habrá dejado el resultado en otra cola y, mediante este método, se lee.

Así que parece complicado… pero podría serlo aún más.

De todas maneras, he grabado un vídeo para que veáis cómo se usa todo lo anterior:

Todo junto es más fácil

Existe la opción de ponerlo todo junto en el mismo archivo, y veréis cómo las cosas resultan aún más sencillas. Esto no siempre es posible y puede tener algún problema (como que cada cambio nos obliga a reiniciar también el servicio), pero es muy didáctico:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# Celery full example: publisher/subscriber to request a multiplication
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

def main():
    promise = multiply.delay(4, 5)
    print(promise.get())

if __name__ == '__main__':
    main()

En este caso invocamos a un método de la tarea directamente, y así obtenemos la tarea.

Ya tenemos una forma muy complicada de multiplicar dos números :D

Herramientas

Como dije, Celery proporciona herramientas y azúcar sintáctico.

Lo primero que vamos a ver son las funciones parciales o partials. Simplemente son funciones con algunos parámetros ya resueltos.

En nuestro caso es fácil crear el partial duplicate forzando el primer parámetro de multiply:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Celery full example: multiply two numbers with partials
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

def main():
    duplicate = multiply.si(2)
    promise = duplicate.delay(5)
    print(promise.get())

if __name__ == '__main__':
    main()

Sí, los parciales se realizan con el método .si, aunque también podemos utilizar .s. La diferencia entre ambas es que la primera es inmutable (por eso la i). Resulta difícil explicar qué es eso sin contar antes otra herramienta de Celery: las cadenas (Chain).

En Celery se pueden concatenar tareas, de manera que el resultado de una de ellas sea el primer parámetro de la siguiente. ¿Por qué el primero? Pues porque sí. Es una putada, pero es así.

En ocasiones querremos funciones encadenadas tan sólo para que exista un orden, en cuyo caso queremos ignorar el resultado de la función anterior. En estos casos usaremos funciones inmutables, que no usan el parámetro anterior.

Espero que se haya notado que uso “funciones” o “servicios” indistintamente.

Pero basta ya de palabrería y concatenemos algo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Celery example: multiply with chains
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

from celery import chain

def main():
    duplicate = multiply.s(2)

    task = chain(multiply.s(4, 5), multiply.s(2))
    promise = task.delay()
    print(promise.get())

if __name__ == '__main__':
    main()

Como dije antes, Celery proporciona azúcar sintáctico, por lo que lo anterior también puede escribirse así:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# Celery example: multiply with chains using pipes (syntactic sugar)
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

def main():
    duplicate = multiply.s(2)

    task = multiply.s(4, 5) | multiply.s(2)
    promise = task.delay()
    print(promise.get())

if __name__ == '__main__':
    main()

Y aquí es donde comienza la diversión: hemos creado un canvas, es decir, un flujo de tareas. Aunque parezca mentira, si lanzáis distintos workers en distintas máquinas y todos se conectan al mismo broker, cada operación podría ejecutarse en uno diferente.

Pero claro… ¿para qué tanto canvas si las operaciones son secuenciales? Pues aquí viene la gracia: procesarlas de forma concurrente, los Grupos o group:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Celery example: several multiplications with chains and groups (canvas)
from celery import Celery

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def multiply(a, b):
    return a * b

from celery import group

def main():
    duplicate = multiply.s(2)

    task = group(
        multiply.s(4, 5) | multiply.s(2),
        multiply.s(9, 2) | multiply.s(4),
        multiply.s(9, 7) | duplicate,
    )
    promise = task.delay()
    print(promise.get())

if __name__ == '__main__':
    main()

Existe aún más azúcar sintáctico, como los chords, maps, starmaps y chunks, pero no los vamos a ver aquí. Este artículo es meramente introductorio. Podéis leer más información sobre los Canvas.

El Beat

Otro aspecto importante de Celery es el Beat. Consiste en un latido o una señal producida cada cierto tiempo. Podemos activarla en el worker mediante la opción --beat. Mientras no metan exclusividad en el Beat, es importante lanzar sólo uno o recibiríamos más de un latido.

Es este beat quien nos permite lanzar tareas periódicas.

Veamos un ejemplo pequeñito: Cada 10 segundos vamos a multiplicar la hora por dos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Celery beat example: periodic date operation
import datetime
from celery import Celery
from celery import group

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add_days(days):
    return datetime.datetime.now() + datetime.timedelta(days=days)

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'multiply-each-10-seconds': {
            'task': 'tasks.add_days',
            'schedule': datetime.timedelta(seconds=10),
            'args': (2, )
        },
    },
)

Y aquí os dejo un ejemplo de ejecución:

Nuevamente, no voy a entrar en más detalles aquí; tan sólo pretendo darlo a conocer. Podéis leer más sobre las tareas periódicas si os interesa.

Warning: pickle

Es posible que veamos un warning del estilo warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)). Esto se debe a que estamos usando el serializador pickle, que está obsoleto. Podéis evitarlo añadiendo la línea siguiente:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Celery beat example: periodic date operation
import datetime
from celery import Celery
from celery import group

# RabbitMQ
#app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
# Redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add_days(days):
    return datetime.datetime.now() + datetime.timedelta(days=days)

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'multiply-each-10-seconds': {
            'task': 'tasks.add_days',
            'schedule': datetime.timedelta(seconds=10),
            'args': (2, )
        },
    },
    CELERY_ACCEPT_CONTENT = ['json'],
)

Entornos complejos

Evidentemente, todo lo que he contado aquí es lo más básico de Celery. Hemos utilizado una única cola para nuestras comunicaciones. Dado que RabbitMQ permite operaciones entre colas, Celery también.

Entre otras cosas, se puede:

  • asignar colas diferentes para diferentes tareas, de manera que no todos los workers atiendan todas las peticiones o que haya un número de workers diferente para cada cola.
  • enrutar mensajes entre colas.
  • enviar mensajes a todas las colas (topic).

No me voy a meter aquí en profundidad, pero sí voy a describir cómo funciona la entrega de mensajes. Esta información es necesaria para poder entender los enrutados (obtenido de Exchanges, queues and routing keys):

  1. Se envía un mensaje. Éste se deposita en un EXCHANGE. Si no existía, Celery lo crea.
  2. El EXCHANGE enruta el mensaje a una o más colas, dependiendo de su configuración. Si las colas no existen, Celery las crea.
  3. El mensaje queda esperando en la cola hasta que un consumidor lo recoge y procesa. En ese momento el mensaje se bloquea para que ningún otro consumidor lo lea.
  4. Tras su procesado, el consumidor envía un ACK y el mensaje se borra de la cola.

Por defecto, Celery crea colas con el nombre del exchange y las une (bind) en modo directo, es decir, que todo lo que llega al exchange se deja en la cola.

Se puede jugar con los enrutados y crear arquitecturas realmente complejas, de manera que cada mensaje llege a su destino dependiendo de la operación a realizar.

Más información

Entre los artículos de nivel básico, me gusta mucho el artículo AMQP, RabbitMQ and Celery - A Visual Guide For Dummies, ya que lo explica todo con imágenes. No está mal un artículo publicado en Digitalocean, How To Use Celery with RabbitMQ to Queue Tasks on an Ubuntu VPS o una presentación de la PyCon 2013, titulada Celery for Internal API in SOA infrastructure.

Getting Started Scheduling Tasks with Celery es un artículo que cuenta más cómo configurar tareas periódicas de forma dinámica con DJCelery, bastante interesante aunque no queráis usar Django.

Para entender mejor lo que ocurre a bajo nivel, recomiendo meterse de lleno en AMQP 0-9-1 Model Explained, donde se cuenta cómo funciona el protocolo AMQP. Muy interesante antes de entrar en enrutados complejos. En este mismo sentido recomiendo AMQP in 10 mins : Part3 – Flexible Routing Model, que es más sencillo y sólo explica los conceptos más básicos.

Si queréis participar en un proyecto que utiliza Celery, os puedo animar a que me ayudéis con DJCron, que es un wrapper sobre DJCelery que permite configurar tareas distribuidas de forma dinámica, añadiendo algunas características extra sobre DJCelery.