In my previous blog post we looked into custom queues and task routing. We had to configure per task which queue we want Celery to task a route to. While this approach works well for a simple setup, it does not scale well for applications and micro-services where many Celery tasks need to be routed to a number of different worker queues.
Step 1: Celery task_routes config
Instead of configuring the task_routes
per task, which queue you want to route a task to, you can tell Celery to use a custom class instead by specifying the path to said class (also, have a look at the Celery docs available at http://docs.celeryproject.org/en/latest/userguide/routing.html):
app = Celery(__name__)
app.conf.update({
'broker_url': os.environ['CELERY_BROKER_URL'],
'imports': (
'tasks',
),
'task_routes': ('task_router.TaskRouter',),
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']})</code></pre>
Step 2: Define TaskRouter class
As per our task_routes
value above, we need to define the custom TaskRouter
class in the module task_
router.py
. Celery expects the method route_for_task
that passes the task name as its first argument. Note how the method returns a dict that looks
exactly like the one used for manual task routing.
class TaskRouter:
def route_for_task(self, task, *args, **kwargs):
if ':' not in task:
return {'queue': 'default'}
namespace, _ = task.split(':')
return {'queue': namespace}
Our idea is to route a task based on its task name, in particular we want to assume that our task
names follow the pattern queue:taskname
. In our previous blog post's example
we had a task named fetch_bitcoin_price_index
that we wanted to be routed to a queue
called feeds
. We rename this task to feeds:fetch_bitcoin_price_index
.
@app.task(bind=True, name='feeds:fetch_bitcoin_price_index')
def fetch_bitcoin_price_index(self, start_date, end_date):
...
@app.task(bind=True, name='filters:calculate_moving_average')
def calculate_moving_average(self, args, window):
...
We need to run two Celery workers. One subscribes to the feeds
,
the other one to the filters
queue:
~$ celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds
~$ celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters
Note the --queues
command line arguments. They your workers subscribe to particular queues.
For subscribing to more than one queue, use a comma-separated list, like so --queues=feeds,filters
.
For further information, have a look at the Celery docs.
Step 3: Ready for action
Bring up the docker-compose stack and run example.py
:
# start up stack
~$ docker-compose up -d
# execute python example.py in container
~$ docker-compose exec worker-feeds python example.py --start_date=2018-01-01 --end_date=2018-05-29 --window=3
The script invokes the Celery chain that consists of two tasks: fetch_bitcoin_price_index
fetches
Bicoin Price Index data from the Coindesk API via the feeds
queue to the worker-feeds
Celery worker. When the task completes successfully, the result is passed onto the calculate_moving_average
via the filters
queue to the worker-filters
Celery worker.
Check the docker-compose logs to follow the task flow through the two workers:
~$ docker-compose logs -f
The docker-compose.yml
stack also comes with a flower instance.
Flower a tool for monitoring Celery workers and tasks. Check out your browser on http://localhost:5555.
Summary
In this blog post you learned how to configure Celery to route tasks using a custom task router. This solution scales well when using many tasks across many queues and workers.