Which pool type should you use? Does it have any implications for your code base? Why do certain aspects of your app work with one pool but fail with another? What is the best scaling strategy?
Discussing the Celery execution pool topic in earnest requires some space. I am launching a series of articles on this, each article dedicated to one pool. The focus is on the inner workings, benefits, limitations and use cases of each pool type. My goal is to equip you with the knowledge to make the right choice.
Here is the roadmap for the next couple of weeks. Kicking off with a rundown of the Celery worker, the pool and their relationship in this blog post:
Part 1: The worker and the pool: Separation of concerns
Part 4: The not so new-ish kid on the block: The threads pool
Part 5: Coroutines all the way: The gevent and the eventlet pools
Part 6: How to choose the right pool for the right job
Part 7: Bonus: How to roll your custom pool
What exactly does the Celery worker do and how does it relate to the execution pool (aka worker pool or just pool)? Remember, Celery is made up of three building blocks:
Producer: this is where a task gets scheduled for asynchronous execution, for example
task1.s(a=1, b=2).delay()in a Django app
Message Broker: this is the storage the producer writes the serialized task to (think json, though it could be another format): Redis, RabbitMQ, Amazon SQS, Zookepper or simply the file system
Worker: this is a separate process that subscribes to new messages on the message broker and executes them
The producer sends the task across to the message broker (the name of the task and its arguments). The Celery worker subscribes to the message broker. When the message broker notifies the worker of a new task, the worker pool executes the task, taking into account said arguments: The execution pool runs tasks, the worker runs the execution pool.
What is the point of this separation? It is the overhead unrelated to task execution: things like broker connectivity, handling disconnects, reconnects, retries, communication with other Celery workers, broadcasting events, configuration, logging, error handling etc.
Task execution is all about… 🥁 executing tasks. A task, in its serialised form (task name, args and kwargs), is written to the message broker. The message broker notifies the worker, the worker collects the task details and marks the task as reserved to prevent it being picked up by another worker. Finally, the worker runs the task code via the execution pool (and deals with result or error handling).
As multiple tasks come in via the message broker, there is a design decision to be made as to how to process these tasks. Some options that come to mind:
process one task at a time, each task blocking until it has finished executing
process multiple tasks concurrently, using threading
process multiple tasks concurrently, using multiprocessing
process multiple tasks concurrently, with the help of coroutines
Not so coincidentally, these options map one-to-one to the different Celery worker pool type implementations, which I am covering in the upcoming articles of this series:
the solo pool: one task at a time, blocking
the threads pool: concurrent thread pool based task execution
the prefork pool: concurrent task execution based on multiprocessing
the gevent and eventlet pools: concurrent task execution using greenlets (coroutines)
Where concurrent task execution is supported (all bar the solo pool), the number of concurrent processes, threads or coroutines is controlled by the
Before turning your attention to the solo pool type, I would like to introduce you the Celery BasePool BasePool class first. Every execution pool implementation inherits from BasePool. Understanding some ideas of the BasePool class is a neat way to understand how the worker interacts with the execution pool.
Remember, when you start a Celery worker, you choose a pool via the
--pool option. If you do not choose a pool, Celery chooses one for you (the prefork pool):
$ celery --app worker.app worker --pool prefork|threads|gevent|eventlet|solo
As the worker starts up, it creates an instance of the selected pool class. As long as the worker is up and running, it holds a reference to this pool instance. The pool class inherits from the BasePool, so the worker interacts with the pool instance via the methods defined in the BasePool class.
What happens when a new task comes in?
the worker invokes WorkerController._process_task
…which in turn invokes request.execute_using_pool
…which in turn invokes the pool method pool.apply_async
…which in turn invokes the pool method on_apply
There are two takeaways from this: first off, there is a chain of events that is triggered when the worker receives a new task. This chain ends with a pool’s
on_apply callback. This is where the action happens. Secondly, if you want to understand how different pool implementations work, the
on_apply method is where you should poke about. And this is what I will be doing in the next articles of this series, starting with the solo pool.
Did you find this article valuable?
Support Bjoern Stiel by becoming a sponsor. Any amount is appreciated!