Celery Tasks: A Guide to SQLAlchemy Session Handling

Celery Tasks: A Guide to SQLAlchemy Session Handling

ยท

5 min read

Before I wrote this article, I hadn't realised that SQLAlchemy is a highly controversial subject. Heated discussions on Reddit and X. Love it or hate it. The full drama. The polarising nature of the internet aside, what is true about SQLAlchemy is, that

  • it has a steeper learning curve than the Django ORM

  • requires some boilerplate code

  • you need to know about some lower level concepts

  • has some difficult-to-digest documentation

If you do use SQLAlchemy, you have to write some boilerplate for your Celery tasks which comes with the risk spaghetti code.

In this article, I will run you through some basic SQLAlchemy concepts and show you how I use SQLAlchemy inside my Celery tasks, without resorting to third party packages, which

  • helps you understand how things work

  • provides a generic solution that works with Flask, FastAPI or anything else, even without a web framework

SQLAlchemy Sessions

Life in the Django ORM world is pretty simple. Database operations are provided via the model object:

from celery import Celery

app = Celery(...)

@app.task()
def my_task():
   book = Book.objects.get(title="To Kill a Mockingbird")
   ...
   book.save()

In the SQLAlchemy world, things are very different. All database operations are performed through a session object. The session is strictly separate from the model object:

from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

@app.task()
def my_task():
   session = Session(engine)
   book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
   ...
   session.add(book)
   session.commit()
   session.close()

The session establishes the conversation with the database and represents a staging area for all objects you've loaded, created or manipulated during its lifespan.

Session Management

You can think of SQLAlchemy sessions as database transactions. As a general rule, the lifecycle of a session should be separate and external from functions and objects that access and manipulate database data. Sessions are meant to be short. For example, in the context of an incoming Celery task request, a session should be created at the beginning of the task code and closed at the end, instead of being held open indefinitely and shared across tasks.

from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

@app.task()
def my_task():
   session = Session(engine)
   book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
   ...
   session.add(book)
   session.commit()
   session.close()

Or, using a context manager:

from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

@app.task()
def my_task():
   with Session(engine) as session:
      book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
      ...
      session.add(book)
      session.commit()

Celery Task Class

My issue with the two options above is that it involves a lot of repetitive boiler code inside each task. It would be nice if every Celery task request would come with a ready-to-go session object, without having to create it at the beginning and to close it at the end. Something like this:

def my_task(session):
   book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
   ...
   session.add(book)
   session.commit()

As it turns out, injecting the session variable at runtime is not possible. It is possible though to bind the task. A bound task has always the task instance as its first argument.

@app.task(bind=True)
def my_task(self):
   ...

By default, self is of type celery.Task. celery.Task defines all methods available for Celery tasks, for example apply_async and retry.

Every interaction between your code and your Celery task as well as every interaction between your worker and your Celery task happens via the celery.Task methods. In fact, when your worker processes a task, it always follows this sequence:

  1. Run before_start

  2. Run the task

  3. Run after_return

Even when the task in step 2 throws an exception, after_return is guaranteed to run. You can use this to streamline the SQLAlchemy session creation and teardown:

  • create the session in before_start

  • make the session available to the bound task

  • close the session in after_return

import celery
from sqlalchemy.orm import Session

class MyTask(celery.Task):
    def __init__(self):
        self.sessions = {}

    def before_start(self, task_id, args, kwargs):
        self.sessions[task_id] = Session(...)
        super().before_start(task_id, args, kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        session = self.sessions.pop(task_id)
        session.close()
        super().after_return(status, retval, task_id, args, kwargs, einfo)

    @property
    def session(self):
        return self.sessions[self.request.id]

Note that there is only a single task instance per process, meaning that every task within a process shares the same task object. To isolate SQLAlchemy sessions per task request, I use a dictionary and the unique task request id as key.

Putting It Together

So far we have:

  • MyTask, a custom celery.Task implementation
  • a task, which binds celery.Task to the Celery task

What is missing is binding MyTask instead of celery.Task to the task. To do this, Celery provides the base argument:

from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

class MyTask(celery.Task):
    def __init__(self):
        self.sessions = {}

    def before_start(self, task_id, args, kwargs):
        self.sessions[task_id] = Session(...)
        super().before_start(task_id, args, kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        session = self.sessions.pop(task_id)
        session.close()
        super().after_return(status, retval, task_id, args, kwargs, einfo)

    @property
    def session(self):
        return self.sessions[self.request.id] 

@app.task(bind=True, base=MyTask)
def my_task(self):
    book = self.session.query(Book).filter_by(title="To Kill a Mockingbird").one()
    ...
    self.session.add(book)
    self.session.commit()

This is a generic solution that delegates SQLAlchemy session handling to a custom Task class. It keeps your task code free from repetitive boilerplate. What do you think? Let me know in the comments ๐Ÿ‘‡.

Did you find this article valuable?

Support Bjoern Stiel by becoming a sponsor. Any amount is appreciated!

ย