Celery Config With Environment Variables

Photo by Mike Hindle on Unsplash

Celery Config With Environment Variables

ยท

4 min read

I am a strong proponent of strict separation of config from code. This means that my preferred app configuration is via environment variables (env vars).

Env vars are easy to change between deploys. They do not require code changes. Env vars are language and operating system agnostic. And there is little risk of env vars being checked into code repos - unlike config files.

Unfortunately, this is where Celery fails me. It encourages the use of configuration files. Though certain settings can be controlled via env vars, others cannot. And the docs are not very transparent as to which settings fall into which category.

This has been a recurring frustration for me over the past years. My typical workaround would usually be something like this:

# celeryconfig.py
import json

broker_url = os.environ["CELERY_BROKER_URL"]
task_serializer = os.getenv("CELERY_TASK_SERIALIZER", "json")
accept_content = json.loads(os.getenv("CELERY_ACCEPT_CONTENT", "[\"json\"]"))

# more settings

A few notes here:

  • CELERY_BROKER_URL is available as an env var to Celery and does not need to be specified here. But I do like all settings in one place.

  • Celery's accept_content expects a list. Env vars are strings, hence the json.loads trick to parse the string as a list.

  • Some of my settings mandatory, others are optional with a (my) default value. Another (more sensible) approach is to let Celery handle the default value by not setting it. I do it the way I do it so I have everything in one place.

The other day I found myself implementing the same workaround again for a new Celery project. As my frustration grew once again, I started tossing around a more generic approach: expose all Celery settings via env vars.

Any environment variable prefixed with CELERY__ is matched to a Celery setting as long as it exists. For example, the CELERY__timezone env var sets Celery's timezone, the CELERY__worker_pool env var sets Celery's worker_pool. And CELERY__nonsense is ignored because there is no nonsense setting in Celery.

# celeryconfig.py
import os
import json
import pydantic
from functools import partial

settings = [
    ("accept_content", json.loads),
    ("enable_utc", partial(pydantic.parse_obj_as, bool)),
    ("imports", json.loads),
    ("include", json.loads),
    ("timezone", str),
    ("beat_max_loop_interval", int),
    ("beat_schedule", json.loads),
    ("beat_scheduler", str),
    ("beat_schedule_filename", str),
    ("beat_sync_every", int),
    ("broker_url", str),
    ("broker_transport", str),
    ("broker_transport_options", json.loads),
    ("broker_connection_timeout", int),
    ("broker_connection_retry", partial(pydantic.parse_obj_as, bool)),
    ("broker_connection_max_retries", int),
    ("broker_failover_strategy", str),
    ("broker_heartbeat", float),
    ("broker_login_method", str),
    ("broker_pool_limit", int),
    ("broker_use_ssl", partial(pydantic.parse_obj_as, bool)),
    ("cache_backend", str),
    ("cache_backend_options", json.loads),
    ("cassandra_table", str),
    ("cassandra_entry_ttl", int),
    ("cassandra_keyspace", str),
    ("cassandra_port", int),
    ("cassandra_read_consistency", str),
    ("cassandra_servers", json.loads),
    ("cassandra_write_consistency", str),
    ("cassandra_options", json.loads),
    ("s3_access_key_id", str),
    ("s3_secret_access_key", str),
    ("s3_bucket", str),
    ("s3_base_path", str),
    ("s3_endpoint_url", str),
    ("s3_region", str),
    ("couchbase_backend_settings", json.loads),
    ("arangodb_backend_settings", json.loads),
    ("mongodb_backend_settings", json.loads),
    ("event_queue_expires", float),
    ("event_queue_ttl", float),
    ("event_queue_prefix", str),
    ("event_serializer", str),
    ("redis_db", str),
    ("redis_host", str),
    ("redis_max_connections", int),
    ("redis_username", str),
    ("redis_password", str),
    ("redis_port", int),
    ("redis_backend_use_ssl", json.loads),
    ("result_backend", str),
    ("result_cache_max", int),
    ("result_compression", str),
    ("result_compression", str),
    ("result_exchange", str),
    ("result_exchange_type", str),
    ("result_expires", int),
    ("result_persistent", partial(pydantic.parse_obj_as, bool)),
    ("result_serializer", str),
    ("database_engine_options", json.loads),
    ("database_short_lived_sessions", partial(pydantic.parse_obj_as, bool)),
    ("database_db_names", json.loads),
    ("security_certificate", str),
    ("security_cert_store", str),
    ("security_key", str),
    ("task_acks_late", partial(pydantic.parse_obj_as, bool)),
    ("task_acks_on_failure_or_timeout", partial(pydantic.parse_obj_as, bool)),
    ("task_always_eager", partial(pydantic.parse_obj_as, bool)),
    ("task_annotations", json.loads),
    ("task_compression", str),
    ("task_create_missing_queues", partial(pydantic.parse_obj_as, bool)),
    ("task_default_delivery_mode", str),
    ("task_default_exchange", str),
    ("task_default_exchange_type", str),
    ("task_default_queue", str),
    ("task_default_rate_limit", int),
    ("task_default_routing_key", str),
    ("task_eager_propagates", partial(pydantic.parse_obj_as, bool)),
    ("task_ignore_result", partial(pydantic.parse_obj_as, bool)),
    ("task_publish_retry", partial(pydantic.parse_obj_as, bool)),
    ("task_publish_retry_policy", json.loads),
    ("task_queues", json.loads),
    ("task_routes", json.loads),
    ("task_send_sent_event", partial(pydantic.parse_obj_as, bool)),
    ("task_serializer", str),
    ("task_soft_time_limit", int),
    ("task_track_started", partial(pydantic.parse_obj_as, bool)),
    ("task_reject_on_worker_lost", partial(pydantic.parse_obj_as, bool)),
    ("task_time_limit", int),
    ("worker_agent", str),
    ("worker_autoscaler", str),
    ("worker_concurrency", int),
    ("worker_consumer", str),
    ("worker_direct", partial(pydantic.parse_obj_as, bool)),
    ("worker_disable_rate_limits", partial(pydantic.parse_obj_as, bool)),
    ("worker_enable_remote_control", partial(pydantic.parse_obj_as, bool)),
    ("worker_log_color", partial(pydantic.parse_obj_as, bool)),
    ("worker_log_format", str),
    ("worker_lost_wait", float),
    ("worker_max_tasks_per_child", int),
    ("worker_pool", str),
    ("worker_pool_putlocks", partial(pydantic.parse_obj_as, bool)),
    ("worker_pool_restarts", partial(pydantic.parse_obj_as, bool)),
    ("worker_prefetch_multiplier", int),
    ("worker_redirect_stdouts", partial(pydantic.parse_obj_as, bool)),
    ("worker_redirect_stdouts_level", str),
    ("worker_send_task_events", partial(pydantic.parse_obj_as, bool)),
    ("worker_state_db", str),
    ("worker_task_log_format", str),
    ("worker_timer", str),
    ("worker_timer_precision", float)
]

for setting, parse in settings:
    env_var = f"CELERY__{setting}"
    if env_var in os.environ:
        value = os.environ[env_var]
        value = parse(os.environ[env_var])
        globals()[setting] = value

I make use of some partial, pydantic and json jiggery pokery to do the parsing and type conversion. And that allows me to configure my Celery worker in a docker compose neatly:

worker:
  image: bstiel/celeryq:0.1
  restart: always
  command: ["celery", "--app=worker.app", "worker"]
  environment:
    - "CELERY__broker_url=redis://redis:6379/1"
    - 'CELERY__accept_content=["yaml", "json"]'

PS: Just as I was wrapping up this blog post, I came across a Celery GitHub issue:

I came here looking for the opposite: a way to configure Celery with env vars.

I am considering submitting a pull request, what do you think?

Did you find this article valuable?

Support celery.school by becoming a sponsor. Any amount is appreciated!

ย