This lesson is being piloted (Beta version)

Distributed Computations

Overview

Teaching: 25 min
Exercises: 15 min
Questions
  • How can I avoid the GIL problem?

  • How can I run multiple Python interpreters at once on one problem?

Objectives

So we have started to see some of the implications of GIL when we are using Dask. Now we will look at a way to avoid it even if your code needs frequent access to the Python Interpreter (e.g. you haven’t converted a bunch of it to C code with something like Cython which can seriously improve the performance of Python code even before parallelization).

The basic idea with distributed computations is to give each execution of your Python code it’s own Python interpreter. This means that there will necessarily be massage passing and coordinating between the different processes running the different Python interpreters. Luckily Dask takes care of all this for us and after a bit of additional setup we can use it with Delayed just as we did before but without issues with GIL.

Since we are coming back from last day we have to log back into the cluster and re-activate our virtual environment.

$ ssh -X <your-username>@pcs.ace-net.training
$ source ~/dask/bin/activate

Lets also re-do our squeue alias as we will want that. To make this permanent you could put this line into your ~/.bashrc file.

$ alias sqcm="squeue -u $USER -o'%.7i %.9P %.8j %.7u %.2t %.5M %.5D %.4C %.5m %N'"

Lets start with our previous compute.py script and modify it to run in a distributed way.

$ cp compute.py compute-distributed.py
$ nano compute-distributed.py
import time
import dask
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
...
def main():

  size=40000000
  numParts=4
  numWorkers=1

  parts=[]
  for i in range(numParts):
    part=dask.delayed(computePart)(size)
    parts.append(part)
  sumParts=dask.delayed(sum)(parts)

  #create the "cluster"
  cluster=SLURMCluster(cores=1,memory="256M",walltime='00:05:00')

  #Show us the job script used to launch the workers
  print(cluster.job_script())
  client=Client(cluster)

  #create the workers
  cluster.scale(numWorkers)

  #sleep a little bit for workers to create and
  #check in with the scheduler
  time.sleep(5)

  start=time.time()
  sumParts.compute()
  computeTime=elapsed(start)

  client.close()
  cluster.close()
...

compute-distributed.py

In the above script we have added a few bits. We have imported SLURMCluster which allows us to submit jobs to create independent Dask workers and we have imported the Dask Client so that we can tell it to use the software Dask cluster we create.

We can create some number of workers using the cluster.scale(numWorkers) function. After these setup bits our computation continues as normal with Dask Delayed.

Lets run our new script, this time the computation is all done in the workers and not in the job we submit with the srun command. There is no need to change the number of CPUs we request as that is all taken care of by changing the numWorkers variable. Lets also immediately run the sqcm command to see what jobs we have running and frequently afterwards to see how Dask spawns new workers for us.

$ srun python compute-distributed.py&
$ sqcm
  JOBID PARTITION     NAME   USER ST  TIME NODES CPUS MIN_M NODELIST
    980 cpubase_b   python user49  R  0:02     1    1  256M node-mdm1

Here you can just see our first job we submitted.

$ sqcm
  JOBID PARTITION     NAME   USER ST  TIME NODES CPUS MIN_M NODELIST
    981 cpubase_b dask-wor user49 PD  0:00     1    1  245M
    980 cpubase_b   python user49  R  0:04     1    1  256M node-mdm1

Here you can see the worker job that Dask spawned for our work is in the PD or pending state.

$ sqcm
  JOBID PARTITION     NAME   USER ST  TIME NODES CPUS MIN_M NODELIST
    980 cpubase_b   python user49  R  0:07     1    1  256M node-mdm1
    981 cpubase_b dask-wor user49  R  0:02     1    1  245M node-mdm1

Finally here we see that the worker is up and running.

#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=245M
#SBATCH -t 00:05:00

/home/user49/dask/bin/python -m distributed.cli.dask_worker tcp://192.168.0.133:44075 --nthreads 1 --memory-limit 244.14MiB --name dummy-name --nanny --death-timeout 60

Above we see a print out of the job script that Dask uses to launch our workers. The settings for this script come from the settings we gave to the SLURMCluster function.

=======================================
Compute time: 12.071980953216553s
=======================================


----------------------------------------
wall clock time:18.724435329437256s
----------------------------------------

And finally we get our timings for performing our computations. A little longer than with our pur Dask Delayed code, but lets see how it changes with more cores, or rather more workers.

More cores distributed

Given the above compute-distributed.py run first with numWorkers=1 to get a base line, then run with numWorkers=2, 4, and 8.

HINT: you don’t need to change the srun python compute-distributed.py& command as you change the number of workers.

Solution

numWorkers=1

====================================
Compute time: 12.333828449249268s
====================================

numWorkers=2

====================================
Compute time= 6.07476544380188s
====================================

numWorkers=4

====================================
Compute time= 3.454866409301758s
====================================

numWorkers=8

====================================
Compute time= 3.3805696964263916s
====================================

Now we are getting some true parallelism. Notice how more than 4 workers doesn’t improve things, why is that?

Running on busy clusters

In this workshop we have been learning about Dask on a relatively quite training cluster. On a production cluster creating a job which then creates other worker jobs to perform computations might not be the best option as it could take quite some time for the worker jobs to start up and connect to the managing job. This time for worker jobs to start up may exceed the original job’s time limit. A possible alternative could be to use Dask-MPI to launch all workers inside a single MPI job.

Key Points