Previously, I wrote about how to customise your Celery log handlers.
But there is another Celery logger, the celery.task_logger
. The celery.task logger is a special logger set up by the Celery worker. Its goal is to add task-related information to the log messages. It exposes two new parameters:
task_id
task_name
This is useful because it helps you understand which task a log message comes from. The task logger is available via celery.utils.log
.
# tasks.py
import os
from celery.utils.log import get_task_logger
from worker import app
logger = get_task_logger(__name__)
@app.task()
def add(x, y):
result = x + y
logger.info(f'Add: {x} + {y} = {result}')
return result
Executing the add
task with get_task_logger
produces the following log output.
[2018-11-06 07:30:13,545: INFO/MainProcess] Received task: tasks.get_request[9c332222-d2fc-47d9-adc3-04cebbe145cb]
[2018-11-06 07:30:13,546: INFO/MainProcess] tasks.get_request[9c332222-d2fc-47d9-adc3-04cebbe145cb]: Add: 3 + 5 = 8
[2018-11-06 07:30:13,598: INFO/MainProcess] Task tasks.get_request[9c332222-d2fc-47d9-adc3-04cebbe145cb] succeeded in 0.052071799989789724s: None
If your Celery application processes many tasks, the celery.task logger is almost indispensable to make sense of your log output. Compare this to the log message generated by the standard logging.getLogger:
[2018-11-06 07:33:16,140: INFO/MainProcess] Received task: tasks.get_request[7d2ec1a7-0af2-4e8c-8354-02cd0975c906]
[2018-11-06 07:33:16,140: INFO/MainProcess] Add: 3 + 5 = 8
[2018-11-06 07:33:16,193: INFO/MainProcess] Task tasks.get_request[7d2ec1a7-0af2-4e8c-8354-02cd0975c906] succeeded in 0.052330999984405935s: None
How to customise the celery.task log format
How do you customise the celery.task
log message format? Remember how you customise the Celery logger using the after_setup_logger
signal? There is a similar signal for the celery.task
logger. The after_setup_task_logger
signal gets triggered as soon as Celery worker has set up the celery.task
logger. This is the signal we want to connect to in order to customise the log formatter.
There is one gotcha: In order to get access to task_id
and task_name
, you have to use celery.app
.log.TaskFormatter
instead of logging.Formatter
. celery.app
.log.TaskFormatter
is an extension of logging.Formatter
and gets a reference to the current Celery task at runtime (check out the source code if you want to take a deeper dive).
# worker.py
import os
from celery import Celery
from celery.signals import after_setup_task_logger
from celery.app.log import TaskFormatter
app = Celery()
@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
for handler in logger.handlers:
handler.setFormatter(TaskFormatter('%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(levelname)s - %(message)s'))
How to get the task_id using the standard logger?
The celery.task
logger works great for anything which is definitely a Celery task. But what about lower-level code? Models, for example, are usually used both in a Celery and non-Celery context. If your front-of-the-house is a Flask web application, your models can be used either in the Flask or Celery process.
# models.py
import logging
from passlib.hash import sha256_crypt
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import validates
from sqlalchemy import text
from . import db
logger = logging.getLogger(__name__)
class User(db.Model):
__tablename__ = 'users'
id = db.Column(UUID(as_uuid=True), primary_key=True, server_default=text("uuid_generate_v4()"))
name = db.Column(db.String(64), unique=False, nullable=True)
email = db.Column(db.String(256), unique=True, nullable=False)
@validates('email')
def validate_email(self, key, value):
logger.info(f'Validate email address: {value}')
if value is not None:
assert '@' in value
return value.lower()
Your lower-level code should not care in which context it runs. You do not want to pollute it with a Celery-specific logger implementation. What you do want is to get the Celery task id in the log message when validate_email
is called from within a Celery task. And no task id when validate_email
is called from within Flask.
Good news is, you can do this with a simple trick. celery.app
.log.TaskFormatter
does the magic that injects task_id and task_name
. It does so by calling celery._state.get_current_task
. If celery._state.get_current_task
is executed outside a Celery task, it simply returns None
. celery.app
.log.TaskFormatter
handles None
by printing ???
instead of the task_id
and task_name
. This means you can safely create your log handler outside Celery using celery.app
.log.TaskFormatter
.
import logging
from celery.app.log import TaskFormatter
logger = logging.getLogger()
sh = logging.StreamHandler()
sh.setFormatter(TaskFormatter('%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(sh)
If you don't like the ???
defaults or the fact that you have to import from celery.app
.log
, write your own custom task formatter.
import logging
class TaskFormatter(logging.Formatter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
try:
from celery._state import get_current_task
self.get_current_task = get_current_task
except ImportError:
self.get_current_task = lambda: None
def format(self, record):
task = self.get_current_task()
if task and task.request:
record.__dict__.update(task_id=task.request.id,
task_name=task.name)
else:
record.__dict__.setdefault('task_name', '')
record.__dict__.setdefault('task_id', '')
return super().format(record)
logger = logging.getLogger()
sh = logging.StreamHandler()
sh.setFormatter(
TaskFormatter(
'%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(sh)
This custom TaskFormatter
works with logging.getLogger
. It imports celery._state.get_current_task
if celery is present, otherwise not. If it runs inside a Celery worker process, it injects the task id and the task name, otherwise not. It just works.