Introducing Director – a tool to build your Celery workflows

As developers, we often need to execute tasks in the background. Fortunately, some tools already exist for this. In the Python ecosystem, for instance, the most well-known library is Celery. If you have already used it, you know how great it is! But you will also have probably discovered how complicated it can be to follow the state of a complex workflow.

Celery Director is a tool we created at OVHcloud to fix this problem. The code is now open-sourced and is available on Github.

Following the talk we did during FOSDEM 2020, this post aims to present the tool. We’ll take a close look at what Celery is, why we created Director, and how to use it.

What is Celery?

Here is the official description of Celery:

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

The important words here are “task queue”. This is a mechanism used to distribute work across a pool of machines or threads.

The queue, in the middle of the above diagram, stores messages sent by the producers (APIs, for instance). On the other side, consumers are constantly reading the queue to display new messages and execute tasks.

In Celery, a message sent by the producer is the signature of a Python function: send_email("john.doe"), for example.

The queue (named broker in Celery) stores this signature until a worker reads it and really executes the function within the given parameter.

But why execute a Python function somewhere else? The main reason is to quickly return a response in cases of long-running functions. Indeed, it’s not an option to keep users waiting for a response for several seconds or minutes.

Just as we can imagine producers without enough resources, with a CPU-bound task, a more robust worker could handle its execution.

How to use Celery

So Celery is a library used to execute a Python code somewhere else, but how does it do that? In fact, it’s really simple! To illustrate this, we’ll use some of the available methods to send tasks to the broker, then we’ll start a worker to consume them.

Here is the code to create a Celery task:

# tasks.py
from celery import Celery

app = Celery("tasks", broker="redis://127.0.0.1:6379/0")

@app.task
def add(x, y):
    return x + y

As you can see, a Celery task is just a Python function transformed to be sent in a broker. Note that we passed the redis connection to the Celery application (named app) to inform the broker where to store the messages.

This means it’s now possible to send a task in the broker:

>>> from tasks import add
>>> add.delay(2, 3)

That’s all! We used the .delay() method, so our producer didn’t execute the Python code but instead sent the task signature to the broker.

Now it’s time to consume it with a Celery worker:

$ celery worker -A tasks --loglevel=INFO
[...]
[2020-02-14 17:13:38,947: INFO/MainProcess] Received task: tasks.add[0e9b6ff2-7aec-46c3-b810-b62a32188000]
[2020-02-14 17:13:38,954: INFO/ForkPoolWorker-2] Task tasks.add[0e9b6ff2-7aec-46c3-b810-b62a32188000] succeeded in 0.0024250600254163146s: 5

It’s even possible to combine the Celery tasks with some primitives (the full list is here):

  • Chain: will execute tasks one after the other.
  • Group: will execute tasks in parallel by routing them to multiple workers.

For example, the following code will make two additions in parallel, then sum the results:

from celery import chain, group

# Create the canvas
canvas = chain(
    group(
        add.si(1, 2),
        add.si(3, 4)
    ),
    sum_numbers.s()
)

# Execute it
canvas.delay()

You probably noted we didn’t use the .delay() method here. Instead we created a canvas, used to combine a selection of tasks.

The .si() method is used to create an immutable signature (i.e. one that does not receive data from a previous task), while .s() relies on the data returned by the two previous tasks.

This introduction to Celery has just covered its very basic usage. If you’re keen to find out more, I invite you to read the documentation, where you’ll discover all the powerful features, including rate limits, tasks retrying, or even periodic tasks.

As a developer, I want…

I’m part of a team whose goal is to deploy and monitor internal infrastructures. As part of this, we needed to launch some background tasks, and as Python developers our natural choice was to use Celery. But, out of the box, Celery didn’t supported certain specific requirements for our projects:

  • Tracking the tasks’ evolution and their dependencies in a WebUI.
  • Executing the workflows using API calls, or simply with a CLI.
  • Combining tasks to create workflows in YAML format.
  • Periodically executing a whole workflow.

Some other cool tools exist for this, like Flower, but this only allows us to track each task individually, not a whole workflow and its component tasks.

And as we really needed these features, we decided to create Celery Director.

How to use Director

The installation can be done using the pipcommand:

$ pip install celery-director

Director provides a simple command to create a new workspace folder:

$ director init workflows
[*] Project created in /home/ncrocfer/workflows
[*] Do not forget to initialize the database
You can now export the DIRECTOR_HOME environment variable

A new tasks folder and a workflow example has been created for you below:

$ tree -a workflows/
├── .env
├── tasks
│   └── etl.py
└── workflows.yml

The tasks/*.py files will contain your Celery tasks, while the workflows.yml file will combine them:

$ cat workflows.yml
---
ovh.SIMPLE_ETL:
  tasks:
    - EXTRACT
    - TRANSFORM
    - LOAD

This example, named ovh.SIMPLE_ETL, will execute three tasks, one after the other. You can find more examples in the documentation.

After exporting the DIRECTOR_HOME variable and initialising the database with director db upgrade, you can execute this workflow :

$ director workflow list
+----------------+----------+-----------+
| Workflows (1)  | Periodic | Tasks     |
+----------------+----------+-----------+
| ovh.SIMPLE_ETL |    --    | EXTRACT   |
|                |          | TRANSFORM |
|                |          | LOAD      |
+----------------+----------+-----------+
$ director workflow run ovh.SIMPLE_ETL

The broker has received the tasks, so now you can launch the Celery worker to execute them:

$ director celery worker --loglevel=INFO

And then display the results using the webserver command (director webserver):

This is just the beginning, as Director provides other features, allowing you to parametrise a workflow or periodically execute it, for example. You will find more details on these features in the documentation.

Conclusion

Our teams use Director regularly to launch our workflows. No more boilerplating, and no more need for advanced Celery knowledge… A new colleague can easily create its tasks in Python and combine them in YAML, without using the Celery primitives discussed earlier.

Sometimes we need to execute a workflow periodically (to populate a cache, for instance), and sometimes we need to manually call it from another web service (note that a workflow can also be executed through an API call). This is now possible using our single Director instance.

We invite you to try Director for yourself, and give us your feedback via Github, so we can continue to enhance it. The source code can be found in Github, and the 2020 FOSDEM presentation is available here.

Website | + posts

Python Developer and Team lead @OVHcloud

[Twitter] - [Blog]