By default, Celery routes all tasks to a single queue and all workers consume this default queue.
With Celery queues, you can control which Celery workers process which tasks. This can be useful if you have a slow and a fast task and you want the slow tasks not to interfere with the fast tasks. Or if you need to send tasks from one microservice to another microservice.
Step 1: Configure Celery via task_routes
Celery can be configured on a per-task basis which queue a task gets sent to:
app = Celery(__name__)
app.conf.update({
'broker_url': os.environ['CELERY_BROKER_URL'],
'imports': (
'tasks',
),
'task_routes': {
'fetch_bitcoin_price_index': {'queue': 'feeds'}
'calculate_moving_average': {'queue': 'filters'}
},
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']})
Step 2: Make worker subscribe to a queue
We run one Celery worker that subscribes to the feeds
queue and processes fetch_bitcoin_price_index
tasks.
# start celery worker
~$ celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds
The other worker subscribes to the filters
queue and processes calculate_moving_average
tasks:
~$ celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters
Note the --queues
command line arguments. They your workers subscribe to particular queues.
For subscribing to more than one queue, use a comma-separated list, like so --queues=feeds,filters
.
For further information, have a look at the Celery docs.
Step 3: Give it a go
Bring up the docker-compose stack and execute example.py
:
# start docker stack
~$ docker-compose up -d
# execute example.py
~$ docker-compose exec worker-feeds python example.py --start_date=2018-01-01 --end_date=2018-05-29 --window=3
Here, we re-use the Celery task chain from my previous blog post.Inside example.py
, we invoke the Celery chain that consists of two tasks: fetch_bitcoin_price_index
fetches Bicoin Price Index data from the Coindesk API via thefeeds
queue to the worker-feeds
Celery worker.
When the task completes successfully, the result is passed onto the calculate_moving_average
via the filters
queue to the worker-filters
Celery worker.
import argparse
from celery import chain
from tasks import fetch_bitcoin_price_index, calculate_moving_average
parser = argparse.ArgumentParser()
parser.add_argument('--start_date')
parser.add_argument('--end_date')
parser.add_argument('--window', default=3)
args = parser.parse_args()
task = chain(
fetch_bitcoin_price_index.s(
start_date=args.start_date,
end_date=args.end_date),
calculate_moving_average.s(window=args.window)
).delay()
Check the docker-compose logs to follow the task flow through the two workers:
# print logs
~$ docker-compose logs -f
The docker-compose.yml
stack also comes with a Flower instance Flower is a tool for monitoring Celery workers and tasks. Check out your browser on http://localhost:5555.
Summary
In this blog post you learned how to configure Celery to route tasks to dedicated queues and how to make Celery workers subscribe to certain queues.
In order to achieve that, you need to define routes per task. This approach works well for a simple setup. However it does not scale very well for an application with many Celery tasks or for a micro-services/Docker environment where multiple services communicate via the same message broker.
In the next blog post, you will learn all about dynamic task routing which is a programmatic and scalable solution to overcome the limitations described above.