Celery chords are one of the six Celery workflow primitives. A
Celery workflow defines the order in which individual Celery tasks are
executed asynchronously. A chord consists of a Celery group (called chord header) and a callback. A group is a list of tasks that are executed in parallel. After all group tasks have finished successfully, the callback is executed, passing a list of all the group task results.
A chord is a great workflow to run a distributed Monte Carlo simulation. A Monte Carlo simulation is an algorithm that relies on repeated random sampling to obtain numerical results. In a distributed
simulation, this experiment is run across multiple nodes. When all the nodes are ready, we
need to aggregate the individual results to generate the final result.
In this blog post, you'll run a Monte Carlo simulation to calculate the fair value of an up-and-out call option. Don't worry about the details if you are not familiar with financial derivatives. In a
nutshell, an up-and-out call option is a call option (a call option is a contract that gives you the right to purchase an underlying stock some time in the future at a predetermined strike price) that becomes worthless if the underlying stock price rises above a certain price (barrier).
Step 1 - Monte Carlo simulation
The underlying idea is to simulate the underlying share price over the lifetime of the up-and-out call option. The outcome of each experiment is either that the barrier has been hit or not. If the
barrier is hit during the lifetime, its value is 0. If the barrier has not been hit, the final value of the option is the difference between the final share price (if the share price is greater than the strike price) or 0 (if the share price is lower than the strike price).
@app.task(bind=True, name='up_and_out_call')
def up_and_out_call(self, s0, strike, T, r, sigma, barrier, n_simulation, n_steps = 100.):
dt = T / n_steps
total = 0
for j in range(0, n_simulation):
sT = s0
out = False
for i in range(0, int(n_steps)):
e = scipy.random.normal()
sT *= scipy.exp((r - 0.5 * sigma * sigma) * dt + sigma * e * scipy.sqrt(dt))
if sT > barrier:
out = True
break
if out == False:
total += black_scholes_call(s0, strike, T, r, sigma)
return total / n_simulation
This simulation is run n_simulation
times and returns the average outcome
over the number of simulations, which is the fair value of the option.
Step 2 - Parallelise the simulation with a Celery chord
If you have more than one CPU at you disposal, you can parallelize
the simulation to speed things up. Instead of having one process run n_simulation
,
you'll have n_workers
processes run n_simulation / n_workers
each. When
all processes are finished, you need to average over the individual results.
In Celery chord terms, you define how many simulations you want to run per process (per_worker
) and create a list of simulations / per_worker
tasks. This group (chord header) is
executed in parallel and returns a list of floats. You need a new task, which takes
that list of floats and returns the average (chord callback).
@app.task(bind=True, name='mean')
def mean(self, args):
return sum(args) / len(args)
With these ingredients defined, the actual Celery chord looks like this.
def run_simulation():
simulations = 100000
per_worker = 1000
n = int(simulations / per_worker)
s0 = 100
strike = 120
T = 0.5
r = 0.01
sigma = 0.1
barrier = 150
chord([up_and_out_call.s(
s0=s0,
strike=strike,
T=T,
r=r,
sigma=sigma,
barrier=barrier,
n_simulation=per_worker) for i in range(0, n)], mean.s())()
Step 3 - Punch it
How many processes you run in parallel at any given time is a function of how many Celery workers
are have up and running. The number of workers can be scaled with docker-compose
. Clone the
GitHub repository
and bring up the stack with docker-compose up -d
. This, by default,
starts one worker. Kick off a simulation via the Flask API:
$ curl -d '{}' -H "Content-Type: application/json" -X POST http://localhost:8000
While it's running, scale up the number of workers using docker-compose up -d --scale worker=3
and scale back down with docker-compose up -d --scale worker=1
. Try out different
scenarios and follow the Celery task flow in the logs docker-compose logs -f
.