Before I wrote this article, I hadn't realised that SQLAlchemy is a highly controversial subject. Heated discussions on Reddit and X. Love it or hate it. The full drama. The polarising nature of the internet aside, what is true about SQLAlchemy is, that
it has a steeper learning curve than the Django ORM
requires some boilerplate code
you need to know about some lower level concepts
has some difficult-to-digest documentation
If you do use SQLAlchemy, you have to write some boilerplate for your Celery tasks which comes with the risk spaghetti code.
In this article, I will run you through some basic SQLAlchemy concepts and show you how I use SQLAlchemy inside my Celery tasks, without resorting to third party packages, which
helps you understand how things work
provides a generic solution that works with Flask, FastAPI or anything else, even without a web framework
SQLAlchemy Sessions
Life in the Django ORM world is pretty simple. Database operations are provided via the model object:
from celery import Celery
app = Celery(...)
@app.task()
def my_task():
book = Book.objects.get(title="To Kill a Mockingbird")
...
book.save()
In the SQLAlchemy world, things are very different. All database operations are performed through a session object. The session is strictly separate from the model object:
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = Celery(...)
engine = create_engine("...")
@app.task()
def my_task():
session = Session(engine)
book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
session.add(book)
session.commit()
session.close()
The session establishes the conversation with the database and represents a staging area for all objects you've loaded, created or manipulated during its lifespan.
Session Management
You can think of SQLAlchemy sessions as database transactions. As a general rule, the lifecycle of a session should be separate and external from functions and objects that access and manipulate database data. Sessions are meant to be short. For example, in the context of an incoming Celery task request, a session should be created at the beginning of the task code and closed at the end, instead of being held open indefinitely and shared across tasks.
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = Celery(...)
engine = create_engine("...")
@app.task()
def my_task():
session = Session(engine)
book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
session.add(book)
session.commit()
session.close()
Or, using a context manager:
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = Celery(...)
engine = create_engine("...")
@app.task()
def my_task():
with Session(engine) as session:
book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
session.add(book)
session.commit()
Celery Task Class
My issue with the two options above is that it involves a lot of repetitive boiler code inside each task. It would be nice if every Celery task request would come with a ready-to-go session object, without having to create it at the beginning and to close it at the end. Something like this:
def my_task(session):
book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
session.add(book)
session.commit()
As it turns out, injecting the session variable at runtime is not possible. It is possible though to bind the task. A bound task has always the task instance as its first argument.
@app.task(bind=True)
def my_task(self):
...
By default, self
is of type celery.Task
. celery.Task
defines all methods available for Celery tasks, for example apply_async
and retry
.
Every interaction between your code and your Celery task as well as every interaction between your worker and your Celery task happens via the celery.Task
methods. In fact, when your worker processes a task, it always follows this sequence:
Run
before_start
Run the task
Run
after_return
Even when the task in step 2 throws an exception, after_return
is guaranteed to run. You can use this to streamline the SQLAlchemy session creation and teardown:
create the session in
before_start
make the session available to the bound task
close the session in
after_return
import celery
from sqlalchemy.orm import Session
class MyTask(celery.Task):
def __init__(self):
self.sessions = {}
def before_start(self, task_id, args, kwargs):
self.sessions[task_id] = Session(...)
super().before_start(task_id, args, kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
session = self.sessions.pop(task_id)
session.close()
super().after_return(status, retval, task_id, args, kwargs, einfo)
@property
def session(self):
return self.sessions[self.request.id]
Note that there is only a single task instance per process, meaning that every task within a process shares the same task object. To isolate SQLAlchemy sessions per task request, I use a dictionary and the unique task request id as key.
Putting It Together
So far we have:
MyTask
, a customcelery.Task
implementation
- a task, which binds
celery.Task
to the Celery task
What is missing is binding MyTask
instead of celery.Task
to the task. To do this, Celery provides the base
argument:
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = Celery(...)
engine = create_engine("...")
class MyTask(celery.Task):
def __init__(self):
self.sessions = {}
def before_start(self, task_id, args, kwargs):
self.sessions[task_id] = Session(...)
super().before_start(task_id, args, kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
session = self.sessions.pop(task_id)
session.close()
super().after_return(status, retval, task_id, args, kwargs, einfo)
@property
def session(self):
return self.sessions[self.request.id]
@app.task(bind=True, base=MyTask)
def my_task(self):
book = self.session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
self.session.add(book)
self.session.commit()
This is a generic solution that delegates SQLAlchemy session handling to a custom Task class. It keeps your task code free from repetitive boilerplate. What do you think? Let me know in the comments ๐.