Custom Celery Task Logger

Photo by Jess Bailey on Unsplash

Custom Celery Task Logger

ยท

4 min read

Previously, I wrote about how to customise your Celery log handlers.

But there is another Celery logger, the celery.task_logger. The celery.task logger is a special logger set up by the Celery worker. Its goal is to add task-related information to the log messages. It exposes two new parameters:

  • task_id

  • task_name

This is useful because it helps you understand which task a log message comes from. The task logger is available via celery.utils.log.

# tasks.py
import os
from celery.utils.log import get_task_logger
from worker import app

logger = get_task_logger(__name__)

@app.task()
def add(x, y):
    result = x + y
    logger.info(f'Add: {x} + {y} = {result}')
    return result

Executing the add task with get_task_logger produces the following log output.

[2018-11-06 07:30:13,545: INFO/MainProcess] Received task: tasks.get_request[9c332222-d2fc-47d9-adc3-04cebbe145cb]
[2018-11-06 07:30:13,546: INFO/MainProcess] tasks.get_request[9c332222-d2fc-47d9-adc3-04cebbe145cb]: Add: 3 + 5 = 8
[2018-11-06 07:30:13,598: INFO/MainProcess] Task tasks.get_request[9c332222-d2fc-47d9-adc3-04cebbe145cb] succeeded in 0.052071799989789724s: None

If your Celery application processes many tasks, the celery.task logger is almost indispensable to make sense of your log output. Compare this to the log message generated by the standard logging.getLogger:

[2018-11-06 07:33:16,140: INFO/MainProcess] Received task: tasks.get_request[7d2ec1a7-0af2-4e8c-8354-02cd0975c906]
[2018-11-06 07:33:16,140: INFO/MainProcess] Add: 3 + 5 = 8
[2018-11-06 07:33:16,193: INFO/MainProcess] Task tasks.get_request[7d2ec1a7-0af2-4e8c-8354-02cd0975c906] succeeded in 0.052330999984405935s: None

How to customise the celery.task log format

How do you customise the celery.task log message format? Remember how you customise the Celery logger using the after_setup_logger signal? There is a similar signal for the celery.task logger. The after_setup_task_logger signal gets triggered as soon as Celery worker has set up the celery.task logger. This is the signal we want to connect to in order to customise the log formatter.

There is one gotcha: In order to get access to task_id and task_name, you have to use celery.app.log.TaskFormatter instead of logging.Formatter. celery.app.log.TaskFormatter is an extension of logging.Formatter and gets a reference to the current Celery task at runtime (check out the source code if you want to take a deeper dive).

# worker.py
import os
from celery import Celery
from celery.signals import after_setup_task_logger
from celery.app.log import TaskFormatter

app = Celery()

@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
    for handler in logger.handlers:
        handler.setFormatter(TaskFormatter('%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(levelname)s - %(message)s'))

How to get the task_id using the standard logger?

The celery.task logger works great for anything which is definitely a Celery task. But what about lower-level code? Models, for example, are usually used both in a Celery and non-Celery context. If your front-of-the-house is a Flask web application, your models can be used either in the Flask or Celery process.

# models.py
import logging

from passlib.hash import sha256_crypt
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import validates
from sqlalchemy import text
from . import db

logger = logging.getLogger(__name__)

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(UUID(as_uuid=True), primary_key=True, server_default=text("uuid_generate_v4()"))
    name = db.Column(db.String(64), unique=False, nullable=True)
    email = db.Column(db.String(256), unique=True, nullable=False)

    @validates('email')
    def validate_email(self, key, value):
        logger.info(f'Validate email address: {value}')
        if value is not None:
            assert '@' in value
            return value.lower()

Your lower-level code should not care in which context it runs. You do not want to pollute it with a Celery-specific logger implementation. What you do want is to get the Celery task id in the log message when validate_email is called from within a Celery task. And no task id when validate_email is called from within Flask.

Good news is, you can do this with a simple trick. celery.app.log.TaskFormatter does the magic that injects task_id and task_name. It does so by calling celery._state.get_current_task. If celery._state.get_current_task is executed outside a Celery task, it simply returns None. celery.app.log.TaskFormatter handles None by printing ??? instead of the task_id and task_name. This means you can safely create your log handler outside Celery using celery.app.log.TaskFormatter.

import logging
from celery.app.log import TaskFormatter

logger = logging.getLogger()
sh = logging.StreamHandler()
sh.setFormatter(TaskFormatter('%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(sh)

If you don't like the ??? defaults or the fact that you have to import from celery.app.log, write your own custom task formatter.

import logging

class TaskFormatter(logging.Formatter):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        try:
            from celery._state import get_current_task
            self.get_current_task = get_current_task
        except ImportError:
            self.get_current_task = lambda: None

    def format(self, record):
        task = self.get_current_task()
        if task and task.request:
            record.__dict__.update(task_id=task.request.id,
                                   task_name=task.name)
        else:
            record.__dict__.setdefault('task_name', '')
            record.__dict__.setdefault('task_id', '')
        return super().format(record)

logger = logging.getLogger()
sh = logging.StreamHandler()
sh.setFormatter(
    TaskFormatter(
        '%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(sh)

This custom TaskFormatter works with logging.getLogger. It imports celery._state.get_current_task if celery is present, otherwise not. If it runs inside a Celery worker process, it injects the task id and the task name, otherwise not. It just works.

Did you find this article valuable?

Support Bjoern Stiel by becoming a sponsor. Any amount is appreciated!

ย