This lesson is being piloted (Beta version)

Dask Delayed

Overview

Teaching: 20 min
Exercises: 20 min
Questions
  • How do you install Dask Delayed?

  • How can Dask Delayed be used to parallelize python code?

  • How can I see what Dask Delayed is doing?

Objectives

Installing Dask

Before we can use dask we must install it with the following command on the terminal.

$ pip install pandas numpy dask distributed graphviz bokeh dask_jobqueue mimesis requests matplotlib

This actually installs lots of stuff, not just Dask, but should take around 2 minutes or a bit less. This will install these modules into the virtual environment we setup and are currently working in.

Using Dask Delayed

Lets start by looking at the python code we have from the last episode and thinking about what parts could be run in parallel.

import time

def elapsed(start):
  return str(time.time()-start)+"s"

def inc(x):
  time.sleep(1)
  return x+1
def add(x,y):
  time.sleep(1)
  return x+y

def main():
  x=inc(1)
  y=inc(2)
  z=add(x,y)
  print("z="+str(z))

if __name__=="__main__":
  start=time.time()
  main()
  wallClock=elapsed(start)
  print()
  print("----------------------------------------")
  print("wall clock time:"+wallClock)
  print("----------------------------------------")
  print()

pre-dask.py

The two calls to the inc functions could be called in parallel, because they are totally independent of one-another.

We can use dask.delayed on our functions to make them lazy. When we say lazy we mean that those functions will not be called immediately. What happens instead is that it records what we want to compute as a task into a graph that we will run later using the compute member function on the object returned by the dask.delayed function.

Lets add the new Dask code now.

$ cp pre-dask.py delayed.py
$ nano delayed.py
import time
import dask

...

def main():
  x=dask.delayed(inc)(1)
  y=dask.delayed(inc)(2)
  z=dask.delayed(add)(x,y)
  #result=z.compute()
  #print("result="+str(result))
  
...

However, to illustrate that nothing happens until z.compute() is called lets comment it and the following print line out and run it.

$ srun python ./delayed.py

----------------------------------------
wall clock time:0.22939133644104004s
----------------------------------------

It clearly didn’t call our inc or add functions as any one of those calls should take at least 1 s and the total time is well below 1s. Now lets uncomment the code and rerun it.

...

def main():
  ...
  z=dask.delayed(add)(x,y)
  result=z.compute()
  print("result="+str(result))
...

delayed.py

$ srun python ./delayed.py
result=5

----------------------------------------
wall clock time:3.3499603271484375s
----------------------------------------

Hey, that’s no faster than the non-dask version. In fact it is a very tiny bit slower. What gives? Well, we only ran it on one core. Lets try on two cores and see what happens.

$ srun --cpus-per-task=2 python ./delayed.py
result=5

----------------------------------------
wall clock time:2.169353485107422s
----------------------------------------

Ah that’s better it is now down to about 2s from our original 3s. To help us understand what Dask is doing we can use the member function visualize of the Delayed object which creates a visualization of the graph Dask created for our tasks.

...
def main():
  ...
  z=dask.delayed(add)(x,y)
  z.visualize()
  #result=z.compute()
  #print("result="+str(result))
...
$ srun python ./delayed.py

Which returns fairly quickly because we aren’t actually doing any work. However it has created a mydask.png file which lets us visualize what Dask will do. Lets have a look at it (this requires the -X option to work and a x11 server running).

$ feh mydask.png

dask-delay.png

Notice that this includes the names of the functions from our script and the logical flow of the outputs from the inc function to the inputs of the add function.

Here you can see that the two inc functions can be run in parallel provided we have hardware capable of running them at the same time and afterwards the add function will be run in serial.

Parallelize a loop

Download the below script with the below wget command.

$ wget https://raw.githubusercontent.com/acenet-arc/ACENET_Summer_School_Dask/gh-pages/code/loop-template.py
import time

def elapsed(start):
  return str(time.time()-start)+"s"

def inc(x):
  time.sleep(1)
  return x+1
def main():
  data=[1,2,3,4,5,6,7,8]
  dataInc=[]
  for x in data:
    y=inc(x)
    dataInc.append(y)
  total=sum(dataInc)
  print("total="+str(total))
  
if __name__=="__main__":
  start=time.time()
  main()
  wallClock=elapsed(start)
  print()
  print("----------------------------------------")
  print("wall clock time:"+wallClock)
  print("----------------------------------------")
  print()

loop-template.py

Run this script to with --cpus-per-task=1 and note the run time.

Then apply what you have learned to parallelize this script. Then run with --cpus-per-task=1,2, and 4 and note the times.

Solution

import time
import dask
...
def main():
  ...
  for x in data:
    y=dask.delayed(inc)(x)
    dataInc.append(y)
  total=dask.delayed(sum)(dataInc)
  result=total.compute()
  print("total="+str(result))
...

loop-solution.py

Serial

srun --cpus-per-task=1 python loop-template.py
total=44

----------------------------------------
wall clock time:8.008957147598267s
----------------------------------------

Delayed Serial

srun --cpus-per-task=1 python loop-solution.py
total=44

----------------------------------------
wall clock time:8.009050607681274s
----------------------------------------

Delayed 2 CPUs

srun --cpus-per-task=2 python loop-solution.py
total=44

----------------------------------------
wall clock time:4.008902311325073s
----------------------------------------

Delayed 4 CPUs

srun --cpus-per-task=4 python loop-solution.py
total=44

----------------------------------------
wall clock time:2.005645990371704s
----------------------------------------

Visualize loop parallelization

Use the solution to the previous challenge to visualize how the loop is being parallelized by Dask.

You can get the solution with the following command.

$ wget https://raw.githubusercontent.com/acenet-arc/ACENET_Summer_School_Dask/gh-pages/code/loop-solution.py

Solution

...
def main():
  ...
  total=dask.delayed(sum)(dataInc)
  total.visualize()
  result=total.compute()
  ...
$ srun --cpus-per-task=4 python loop-solution.py
$ feh mydask.png

dask delayed loop parallelization graph

Parallelize loop with flow control

Download the below script with the below wget command.

$ wget https://raw.githubusercontent.com/acenet-arc/ACENET_Summer_School_Dask/gh-pages/code/loop-flow-template.py
import time

def inc(x):
  time.sleep(1)
  return x+1
def double(x):
  time.sleep(1)
  return 2*x
def isEven(x):
  return not x % 2
def main():
  data=[1,2,3,4,5,6,7,8,9,10]
  dataProc=[]
  for x in data:
    if isEven(x):
      y=double(x)
    else:
      y=inc(x)
    dataProc.append(y)

  total=sum(dataProc)
  print("total="+str(total))
if __name__=="__main__":
  start=time.time()
  main()
  end=time.time()
  print("wall clock time:"+str(end-start)+"s")

loop-flow-template.py

Then parallelize with dask.delayed, compute, and visualize the task graph.

Solution

import time
import dask
...
def main():
  ...
  for x in data:
    if isEven(x):
      y=dask.delayed(double)(x)
    else:
      y=dask.delayed(inc)(x)
    dataProc.append(y)

  total=dask.delayed(sum)(dataProc)
  total.visualize()
  result=total.compute()
  ...

loop-flow-solution.py

Loop with flow control parallelization graph

Key Points