Handling Celery task failures consistently and predictably is a prerequisite to building a resilient asynchronous system. In this blog post, you will learn how to handle Celery task errors and automatically retry failed tasks
To handle exceptions or not?
Assume we have a Celery task that fetches some data from an external API via an http GET request. We want our code to respond predictably to any potential failure such as connection issues, request throttling or unexpected server responses. What does that mean?
@app.task(bind=True):
def fetch_data(self):
url = 'https://www.quandl.com/api/v3/datasets/WIKI/FB/data.json'
response = requests.get(url)
if not response.ok:
raise Exception(f'GET {url} returned unexpected response code: {response.status_code}')
return response.json()
Here, we do not handle errors at all. Very much the opposite. Either the GET request throws an exception somewhere along the way, which we let happily bubble up; or we throw an exception ourselves, in case we do not receive a 2xx response status code or an invalid
JSON response body.
Auto-retry failed tasks
The idea is to not catch any exceptions and let Celery deal with it. Our responsibilities are:
ensure that exceptions bubble up so that our task fails
instruct Celery to do something (or nothing) with a failed task
When you register a Celery task via the decorator, you can tell Celery what to do with the task in case of a failure. autoretry_for
allows you to specify a list of exception types you want to retry for. retry_kwargs
lets you specify additional arguments such as max_retries
(number of max retries) and countdown
(delay between retries). Check out the docs
for a full list of arguments. In the following example, Celery retries up to five times with a two second delay in between retries:
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5, 'countdown': 2})
def fetch_data(self):
url = 'https://www.quandl.com/api/v3/datasets/WIKI/FB/data.json'
response = requests.get(url)
if not response.ok:
raise Exception(f'GET {url} returned unexpected response code: {response.status_code}')
return response.json()
Alternatively, you can retry following the rules of exponential backoff (retry_jitter is used to introduce randomness into exponential backoff delays to prevent all tasks from being executed simultaneously; it's set to False here but you probably want it to be set to True in a production environment). In this example, the first retry happens after 2s, the next retry after 4s, the third one after 8s etc:
@app.task(bind=True, autoretry_for=(Exception,), exponential_backoff=2, retry_kwargs={'max_retries': 5}, retry_jitter=False)
def fetch_data(self):
url = 'https://www.quandl.com/api/v3/datasets/WIKI/FB/data.json'
response = requests.get(url)
if not response.ok:
raise Exception(f'GET {url} returned unexpected response code: {response.status_code}')
return response.json()
Conclusion
There is not much secret to exception handling in Celery other than allowing exceptions to happen and using Celery configuration to deal with it. This coupled with an atomic task design (in the example above the json would be passed onto via a Celery chain to a second task that writes the json to the database) makes for a really powerful, reusable and predictable design.