Skip to content
import dask # parllel library
import tqdm # creating progess bars
import time # atirficially slow down the code
from dask.diagnostics import ProgressBar
import numpy as np

Serial slow code

beta = np.random.normal(size=5)
def simulate_beta_hat(n,beta):
    """ simulate OLS and estimate"""
    nk = len(beta)
    X = np.random.normal(size=(n,nk))
    E = np.random.normal(size=(n))
    Y = np.matmul(X,beta) + E
    beta_hat = np.linalg.solve( np.matmul(X.T,X)  , np.matmul(X.T,Y) )    
    return(beta_hat)    
#create a simple for loop
beta_hat_list = []
for r in tqdm.tqdm(range(10)):
    beta_hat_list.append( simulate_beta_hat(int(1e6),beta) )
np.array(beta_hat_list).std(1)
100%|██████████| 10/10 [00:03<00:00,  2.81it/s]





array([0.8837639 , 0.88393607, 0.88213816, 0.88362958, 0.88311311,
       0.88262567, 0.88220251, 0.88293129, 0.88234124, 0.88307744])

Using dask delayed

beta_hat_serial = simulate_beta_hat(int(5e6),beta) 
beta_hat_dask = dask.delayed(simulate_beta_hat)(int(5e6),beta) 
beta_hat_dask.compute()
array([ 0.08197091, -0.62044626,  0.79480141,  0.52191349,  2.05038949])

create a client

from dask.distributed import Client
client = Client()
client

Client

Cluster

  • Workers: 4
  • Cores: 4
  • Memory: 17.18 GB
#create a simple for loop
V = []
for r in tqdm.tqdm(range(20)):
    V.append( dask.delayed(simulate_beta_hat)(int(1e6),beta) )
100%|██████████| 20/20 [00:00<00:00, 1371.85it/s]
dask.compute(*V)
(array([ 0.08161511, -0.61953092,  0.79522272,  0.52044203,  2.05054851]),
 array([ 0.08285794, -0.62167223,  0.79310007,  0.52217079,  2.05098477]),
 array([ 0.07986196, -0.62199832,  0.79457482,  0.52237575,  2.05020908]),
 array([ 0.08100286, -0.61960148,  0.79531005,  0.52029908,  2.05085504]),
 array([ 0.08178035, -0.61845347,  0.79573612,  0.52183041,  2.05165835]),
 array([ 0.08270565, -0.62064098,  0.79495527,  0.52167249,  2.04932456]),
 array([ 0.0826866 , -0.61942732,  0.7962837 ,  0.52037415,  2.04987182]),
 array([ 0.08140251, -0.62063702,  0.79273842,  0.52031335,  2.04983376]),
 array([ 0.08282938, -0.6208797 ,  0.79485908,  0.5210823 ,  2.04963631]),
 array([ 0.08210893, -0.62119996,  0.79470062,  0.52027452,  2.05000452]),
 array([ 0.08140285, -0.62193338,  0.7931142 ,  0.51959118,  2.04951389]),
 array([ 0.08266208, -0.62223969,  0.79423316,  0.52094465,  2.0507154 ]),
 array([ 0.08106881, -0.61904197,  0.79503763,  0.52083106,  2.05040489]),
 array([ 0.08186745, -0.62058955,  0.79500309,  0.52093739,  2.04960608]),
 array([ 0.08116398, -0.62078771,  0.79489224,  0.52070781,  2.05127837]),
 array([ 0.08135074, -0.61943381,  0.79535478,  0.52226915,  2.05107564]),
 array([ 0.08243417, -0.61824012,  0.79552023,  0.52061954,  2.04887393]),
 array([ 0.08248117, -0.62044762,  0.79482442,  0.52089336,  2.0499758 ]),
 array([ 0.08342013, -0.62032896,  0.79246806,  0.52153688,  2.05029783]),
 array([ 0.08369605, -0.61907743,  0.79485533,  0.52189748,  2.05049225]))
np.array(V).std()
0.4612168338275775
client.close()

Fictious Monte-Carlo workflow

@dask.delayed
def simulate_beta_hat_delayed(n,beta,seed=0):
    nk = len(beta)
    np.random.seed(r)
    X = np.random.normal(size=(n,nk))
    E = np.random.normal(size=(n))
    Y = np.matmul(X,beta) + E
    beta_hat = np.linalg.solve( np.matmul(X.T,X)  , np.matmul(X.T,Y) )    
    return(beta_hat)    

@dask.delayed
def simulate_extract(V):
    time.sleep(10) # wait for 5 seconds
    return( np.array(V).mean(0), np.array(V).std(0))

@dask.delayed
def simulate_combine(R):
    time.sleep(5)
    return( np.array(R) )
V = []
for r in range(5):
    V.append( simulate_beta_hat_delayed(int(1e6),beta) )
R = simulate_extract(V) 
R.visualize()

png

R.compute()
(array([ 0.08207483, -0.61952246,  0.79486013,  0.52201741,  2.0493561 ]),
 array([0., 0., 0., 0., 0.]))
client = Client()
client

Client

Cluster

  • Workers: 4
  • Cores: 4
  • Memory: 17.18 GB
S = []
for s in [int(1e5),int(1e6)]:
    V = []
    for r in range(5):
        V.append( simulate_beta_hat_delayed(s,beta) )
    S.append(simulate_extract(V))
R = simulate_combine(S)
R.visualize()

png

with ProgressBar():
    out = dask.compute(R)

client.close()
out
(array([[[ 0.08322991, -0.61971241,  0.79743907,  0.51906852,
           2.05001336],
         [ 0.        ,  0.        ,  0.        ,  0.        ,
           0.        ]],

        [[ 0.08207483, -0.61952246,  0.79486013,  0.52201741,
           2.0493561 ],
         [ 0.        ,  0.        ,  0.        ,  0.        ,
           0.        ]]]),)