Distributed Monte Carlo with Celery Chords

ยท

4 min read

Celery chords are one of the six Celery workflow primitives. A
Celery workflow defines the order in which individual Celery tasks are
executed asynchronously. A chord consists of a Celery group (called chord header) and a callback. A group is a list of tasks that are executed in parallel. After all group tasks have finished successfully, the callback is executed, passing a list of all the group task results.

A chord is a great workflow to run a distributed Monte Carlo simulation. A Monte Carlo simulation is an algorithm that relies on repeated random sampling to obtain numerical results. In a distributed
simulation, this experiment is run across multiple nodes. When all the nodes are ready, we
need to aggregate the individual results to generate the final result.

In this blog post, you'll run a Monte Carlo simulation to calculate the fair value of an up-and-out call option. Don't worry about the details if you are not familiar with financial derivatives. In a
nutshell, an up-and-out call option is a call option (a call option is a contract that gives you the right to purchase an underlying stock some time in the future at a predetermined strike price) that becomes worthless if the underlying stock price rises above a certain price (barrier).

Step 1 - Monte Carlo simulation

The underlying idea is to simulate the underlying share price over the lifetime of the up-and-out call option. The outcome of each experiment is either that the barrier has been hit or not. If the
barrier is hit during the lifetime, its value is 0. If the barrier has not been hit, the final value of the option is the difference between the final share price (if the share price is greater than the strike price) or 0 (if the share price is lower than the strike price).

@app.task(bind=True, name='up_and_out_call')
def up_and_out_call(self, s0, strike, T, r, sigma, barrier, n_simulation, n_steps = 100.):
    dt = T / n_steps
    total = 0
    for j in range(0, n_simulation):
        sT = s0
        out = False
        for i in range(0, int(n_steps)):
            e = scipy.random.normal()
            sT *= scipy.exp((r - 0.5 * sigma * sigma) * dt + sigma * e * scipy.sqrt(dt))
            if sT > barrier:
                out = True
                break
        if out == False:
            total += black_scholes_call(s0, strike, T, r, sigma)
    return total / n_simulation

This simulation is run n_simulation times and returns the average outcome
over the number of simulations, which is the fair value of the option.

Step 2 - Parallelise the simulation with a Celery chord

If you have more than one CPU at you disposal, you can parallelize
the simulation to speed things up. Instead of having one process run n_simulation,
you'll have n_workers processes run n_simulation / n_workers each. When
all processes are finished, you need to average over the individual results.

In Celery chord terms, you define how many simulations you want to run per process (per_worker) and create a list of simulations / per_worker tasks. This group (chord header) is
executed in parallel and returns a list of floats. You need a new task, which takes
that list of floats and returns the average (chord callback).

@app.task(bind=True, name='mean')
def mean(self, args):
    return sum(args) / len(args)

With these ingredients defined, the actual Celery chord looks like this.

def run_simulation():
    simulations = 100000
    per_worker = 1000
    n = int(simulations / per_worker)

    s0 = 100
    strike = 120
    T = 0.5
    r = 0.01
    sigma = 0.1
    barrier = 150

    chord([up_and_out_call.s(
        s0=s0,
        strike=strike,
        T=T,
        r=r,
        sigma=sigma,
        barrier=barrier,
        n_simulation=per_worker) for i in range(0, n)], mean.s())()

Step 3 - Punch it

How many processes you run in parallel at any given time is a function of how many Celery workers
are have up and running. The number of workers can be scaled with docker-compose. Clone the
GitHub repository
and bring up the stack with docker-compose up -d. This, by default,
starts one worker. Kick off a simulation via the Flask API:

$ curl -d '{}' -H "Content-Type: application/json" -X POST http://localhost:8000

While it's running, scale up the number of workers using docker-compose up -d --scale worker=3
and scale back down with docker-compose up -d --scale worker=1. Try out different
scenarios and follow the Celery task flow in the logs docker-compose logs -f.

Did you find this article valuable?

Support celery.school by becoming a sponsor. Any amount is appreciated!

ย