Falcon is a bare-metal, minimalist Python library for building fast web APIs. Falcon shines with
a clean design that embraces HTTP and the REST architectural style. In this blog post, you will
learn you to integrate Falcon with Celery. I will show you how to structure your Falcon/Celery
project so that is easily maintainable and reusable. And, you will learn how to fetch
economic timeseries data from Quandl via Celery, store it in a
Postgres database and retrieve it via a Falcon REST API.
Project structure
Clean separation of concerns between the Falcon and Celery apps is essential for
maintainability, reusability and scalability. When designing your application,
think of Falcon and Celery as two different apps (that most commonly share only
the models): app.py
(Falcon app) and worker.py
(Celery app).
The models
folder contains the SQLAlchemy models which is common codebase for both the Falcon and Celery apps.
The middleware
and resources
folders are Falcon-specific. Falcon resources
are all the things in your API that can be accessed by a URL. Middleware components provide a way to execute code before and after each request. The tasks
folder contains our Celery tasks and belongs exclusively to the
Celery sphere.
.
+-- app
| +-- app.py
| +-- worker.py
| +-- alembic
| +-- versions
| +-- 20180710_init.py
| +-- env.py
| +-- script.py.mako
| +-- models
| +-- __init__.py
| +-- timeseries.py
| +-- middleware
| +-- __init__.py
| +-- sqla_sessionmanager.py
| +-- resources
| +-- __init__.py
| +-- timeseries.py
| +-- tasks
| +-- __init__.py
| +-- base.py
| +-- quandl.py
| +-- requirements.txt
+-- Dockerfile
+-- docker-compose.yml
+-- README.md
Falcon app
The Falcon app requires only minimum boilerplate code. We need to register the routes and their
respective resources as well as any middleware components. Make sure to checkout the Falcon docs for further details.
#app/app.py
import falcon
import argparse
import sqlalchemy
from sqlalchemy.orm import sessionmaker
from .middleware import SQLAlchemySessionManager
from .resources import TimeSeriesResource
engine = sqlalchemy.create_engine('postgresql://user:password@postgres/5432')
session_factory = sessionmaker(bind=engine)
app = falcon.API(middleware=[SQLAlchemySessionManager(session_factory)])
app.add_route('/timeseries', TimeSeriesResource())
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--debug', help='Run debug server', action='store_true')
args = parser.parse_args()
if args.debug:
from werkzeug.serving import run_simple
run_simple('0.0.0.0', 8000, app, use_debugger=True, use_reloader=True)
For easy database SQLAlchemy session management, we create a middleware component so that we can easily
access the session object from within our resources.
#app/middleware/sqla_sessionmanager.py
from sqlalchemy.orm import scoped_session
class SQLAlchemySessionManager:
def __init__(self, session_factory):
self.session_factory = session_factory
def process_resource(self, req, resp, resource, params):
resource.session = scoped_session(self.session_factory)
def process_response(self, req, resp, resource, req_succeeded):
if hasattr(resource, 'session'):
resource.session.remove()
Finally, we define our timeseries
resource to GET
data from the database. As the request
goes through the SQLAlchemySessionManager, the session instance is
automatically available on the TimeSeriesResource.
We return a JSON response by asiggning the serialized content toresp.body
. If you want to go the extra mile and follow the DRY principle, you can
create another middleware component to do the json.dumps(...)
bit. I left this out
for the sake of simplicity but it is a good excercise to do it yourself.
#app/resources/timeseries.py
import falcon
import uuid
import json
from models import TimeSeries
from tasks.quandl import fetch_data_from_quandl
class TimeSeriesResource:
def on_get(self, req, resp):
if 'id' in req.params:
timeseries = self.session.query(TimeSeries).filter(TimeSeries.id == req.params['id']).one()
resp.status = falcon.HTTP_200
resp.body = json.dumps(timeseries.data)
else:
timeseries = [{
'id': str(timeseries.id),
'database_code': timeseries.database_code,
'dataset_code': timeseries.dataset_code
} for timeseries in self.session.query(TimeSeries).all()]
resp.status = falcon.HTTP_200
resp.body = json.dumps(timeseries)
def on_post(self, req, resp):
body = req.stream.read()
payload = json.loads(body.decode('utf-8'))
database_code = payload['database_code']
dataset_code = payload['dataset_code']
timeseries = TimeSeries(
id=uuid.uuid4(),
database_code=database_code,
dataset_code=dataset_code,
data={},
status='pending'
)
self.session.add(timeseries)
self.session.commit()
fetch_data_from_quandl.s(timeseries_id=timeseries.id).delay()
resp.status = falcon.HTTP_201
resp.body = json.dumps({
'id': str(timeseries.id),
'status': timeseries.status
})
I also define the POST
method here to fetch the data from Quandl asynchronously via Celery.
This is the only interaction between our Falcon and Celery world. Here, I directly import the
task but if you have a good reason to keep it absolutely separate, you can create another abstraction layer and, for
example, call the Celery task by its name.
Also, I recommend creating a dedicated middleware component to deserialize the request body for the POST
and PUT
methods (which I deliberately skipped here).
Celery app
Just like the Falcon app, the Celery app requires minimal boiler plate code: broker_url
specifies the URL to the message broker (here RabbitMQ) and imports
tells Celery
where to look for the tasks.
#app/worker.py
from celery import Celery
app = Celery('app')
app.conf.update({
'broker_url': 'amqp://user:password@rabbitmq:5672',
'imports': (
'tasks',
),
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json'],
'result_compression': 'gzip',
'timezone': 'UTC'})
Similarly to the Falcon middleware, we define a custom Celery base task to manage
the SQLAlchemy database session so that we can readily access the session
object in our Celery tasks.
#app/tasks/base.py
import celery
import sqlalchemy
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
class Task(celery.Task):
def __call__(self, *args, **kwargs):
self.engine = sqlalchemy.create_engine('postgresql://user:password@postgres/5432')
session_factory = sessionmaker(bind=self.engine)
self.session = scoped_session(session_factory)
return super().__call__(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if hasattr(self, 'session'):
self.session.remove()
if hasattr(self, 'engine'):
self.engine.engine.dispose()
Now we have everything in place to create the Celery task that fetches data from Quandl:
#app/tasks/quandl.py
import requests
from models import TimeSeries
from worker import app
from .base import Task
@app.task(bind=True, base=Task, name='fetch_data_from_quandl')
def fetch_data_from_quandl(self, timeseries_id):
timeseries = self.session.query(TimeSeries).filter_by(id=timeseries_id).one()
url = f'https://www.quandl.com/api/v3/datasets/{timeseries.database_code}/{timeseries.dataset_code}/data.json'
response = requests.get(url)
if not response.ok:
self.session.query(TimeSeries).filter_by(id=timeseries_id).update({'status': 'error'})
self.session.commit()
raise ValueError(f'GET {url} returned unexpected response code: {response.status_code}')
self.session.query(TimeSeries).filter_by(id=timeseries_id).update({
'data': response.json(),
'status': 'success'
})
self.session.commit()
Showtime
Start the entire stack, including RabbitMQ and Postgres with docker-compose up -d
. The Falcon app
becomes available on port 8000. Request a financial timeseries from Quandl:
# curl
~$ curl -d '{"database_code":"WIKI", "dataset_code":"FB"}' -H "Content-Type: application/json" -X POST http://localhost:8000
This should return something like this:
{
"id": "4484d077-cee3-4f2c-bf18-50cb5d3573fe",
"status": "pending"
}
Retrieve the result from the database:
# curl
~$ curl http://localhost:8000/timeseries?id=4484d077-cee3-4f2c-bf18-50cb5d3573fe
This, in turn, returns the Quandl result (assuming it's finished already), which is now persisted in our database"
{
"dataset_data": {
"data": [
[
"2018-03-27",
156.31,
162.85,
150.75,
152.19,
76787884,
0,
1,
156.31,
162.85,
150.75,
152.19,
76787884
],
...
Summary
In this blog post you learned how to structure your Celery powered Falcon app. We built a nice little
REST app that asynchronously fetches data from Quandl and writes it into our internal Postgres database.
Key takeaway is to keep the Falcon and Celery apps separate. This allows you (and any other developer involved)
to follow the workflow which in turn makes it an easily maintainable and scalable application.