Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel2017 #1

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 72 additions & 70 deletions base_bs_erf.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,73 +70,75 @@ def gen_data(nopt):
rnd.uniform(TL, TH, nopt),
)

##############################################

def run(name, alg, sizes=15, step=2, nopt=1024, nparr=True, dask=False, pass_args=False):
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--steps', required=False, default=sizes, help="Number of steps")
parser.add_argument('--step', required=False, default=step, help="Factor for each step")
parser.add_argument('--chunk', required=False, default=2000000,help="Chunk size for Dask")
parser.add_argument('--size', required=False, default=nopt, help="Initial data size")
parser.add_argument('--repeat',required=False, default=100, help="Iterations inside measured region")
parser.add_argument('--dask', required=False, default="sq", help="Dask scheduler: sq, mt, mp")
parser.add_argument('--text', required=False, default="", help="Print with each result")

args = parser.parse_args()
sizes= int(args.steps)
step = int(args.step)
nopt = int(args.size)
chunk= int(args.chunk)
repeat=int(args.repeat)
kwargs={}

if(dask):
import dask
import dask.multiprocessing
import dask.array as da
dask_modes = {
"sq": dask.async.get_sync,
"mt": dask.threaded.get,
"mp": dask.multiprocessing.get
}
kwargs = {"schd": dask_modes[args.dask]}
name += "-"+args.dask

for i in xrange(sizes):
price, strike, t = gen_data(nopt)
if not nparr:
call = [0.0 for i in range(nopt)]
put = [-1.0 for i in range(nopt)]
price=list(price)
strike=list(strike)
t=list(t)
repeat=1 # !!!!! ignore repeat count
if dask:
assert(not pass_args)
price = da.from_array(price, chunks=(chunk,), name=False)
strike = da.from_array(strike, chunks=(chunk,), name=False)
t = da.from_array(t, chunks=(chunk,), name=False)
if pass_args:
call = np.zeros(nopt, dtype=np.float64)
put = -np.ones(nopt, dtype=np.float64)
iterations = xrange(repeat)
print("ERF: {}: Size: {}".format(name, nopt)),
sys.stdout.flush()

if pass_args:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put) #warmup
t0 = now()
for _ in iterations:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put, **kwargs)
else:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY) #warmup
t0 = now()
for _ in iterations:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, **kwargs)
mops = get_mops(t0, nopt)
print("MOPS: {}".format(mops*2*repeat), args.text)
nopt *= step
repeat -= step
if repeat < 1:
repeat = 1
##############################################

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's changed here? EOLs?

def run(name, alg, sizes=15, step=2, nopt=1024, nparr=True, dask=False, pass_args=False, verbose=True):
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--steps', required=False, default=sizes, help="Number of steps")
parser.add_argument('--step', required=False, default=step, help="Factor for each step")
parser.add_argument('--chunk', required=False, default=2000000,help="Chunk size for Dask")
parser.add_argument('--size', required=False, default=nopt, help="Initial data size")
parser.add_argument('--repeat',required=False, default=100, help="Iterations inside measured region")
parser.add_argument('--dask', required=False, default="sq", help="Dask scheduler: sq, mt, mp")
parser.add_argument('--text', required=False, default="", help="Print with each result")

args = parser.parse_args()
sizes= int(args.steps)
step = int(args.step)
nopt = int(args.size)
chunk= int(args.chunk)
repeat=int(args.repeat)
kwargs={}

if(dask):
import dask
import dask.multiprocessing
import dask.array as da
dask_modes = {
"sq": dask.async.get_sync,
"mt": dask.threaded.get,
"mp": dask.multiprocessing.get
}
kwargs = {"schd": dask_modes[args.dask]}
name += "-"+args.dask

for i in xrange(sizes):
price, strike, t = gen_data(nopt)
if not nparr:
call = [0.0 for i in range(nopt)]
put = [-1.0 for i in range(nopt)]
price=list(price)
strike=list(strike)
t=list(t)
repeat=1 # !!!!! ignore repeat count
if dask:
assert(not pass_args)
price = da.from_array(price, chunks=(chunk,), name=False)
strike = da.from_array(strike, chunks=(chunk,), name=False)
t = da.from_array(t, chunks=(chunk,), name=False)
if pass_args:
call = np.zeros(nopt, dtype=np.float64)
put = -np.ones(nopt, dtype=np.float64)
iterations = xrange(repeat)
if verbose:
print("ERF: {}: Size: {}".format(name, nopt)),
sys.stdout.flush()

if pass_args:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put) #warmup
t0 = now()
for _ in iterations:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put, **kwargs)
else:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY) #warmup
t0 = now()
for _ in iterations:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, **kwargs)
mops = get_mops(t0, nopt)
if verbose:
print("MOPS: {}".format(mops*2*repeat), args.text)
nopt *= step
repeat -= step
if repeat < 1:
repeat = 1
31 changes: 31 additions & 0 deletions bs_erf_apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import base_bs_erf
import numpy as np
import multiprocessing
import bs_erf_naive

global bs_impl
global pool
global nump


def black_scholes(nopt, price, strike, t, rate, vol):
global bs_impl
global pool
global nump
noptpp = int(nopt/nump)
call = np.empty(nopt, dtype=np.float64)
put = np.empty(nopt, dtype=np.float64)
asyncs = [pool.apply_async(bs_impl, (noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol)) for i in range(0, nopt, noptpp)]
for a,i in zip(asyncs, range(len(asyncs))):
call[i:i+noptpp], put[i:i+noptpp] = a.get()
return call, put


def main(title, impl, thepool):
global bs_impl
global pool
global nump
bs_impl = impl
nump = multiprocessing.cpu_count()
pool = thepool(nump)
base_bs_erf.run(title, black_scholes, pass_args=False)
33 changes: 33 additions & 0 deletions bs_erf_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import base_bs_erf
import multiprocessing

global bs_impl
global pool
global nump


class bs(object):
def __init__(self, nopt, rate, vol):
self.nopt = nopt
self.rate = rate
self.vol = vol

def __call__(self, zipped):
return bs_impl(self.nopt, *zipped, self.rate, self.vol)


def black_scholes(nopt, price, strike, t, rate, vol):
global bs_impl
global pool
z = list(zip(price, strike, t))
return pool.map(bs(nopt, rate, vol), z)


def main(title, impl, thepool):
global bs_impl
global pool
global nump
bs_impl = impl
nump = multiprocessing.cpu_count()
pool = thepool(nump)
base_bs_erf.run(title, black_scholes, pass_args=False)
36 changes: 36 additions & 0 deletions bs_erf_mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import base_bs_erf
import numpy as np
from mpi4py import MPI

global nump
global bs_impl


def black_scholes(nopt, price, strike, t, rate, vol, call, put):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
noptpp = int(nopt/nump)

myprice = np.empty(noptpp, dtype=np.float64)
mystrike = np.empty(noptpp, dtype=np.float64)
myt = np.empty(noptpp, dtype=np.float64)

# Scatter data into arrays
comm.Scatter(price, myprice, root=0)
comm.Scatter(strike, mystrike, root=0)
comm.Scatter(t, myt, root=0)

mycall, myput = bs_impl(noptpp, myprice, mystrike, myt, rate, vol)

comm.Gather(mycall, call)
comm.Gather(myput, put)

return call, put


def main(title, impl):
global nump
global bs_impl
nump = MPI.COMM_WORLD.size
bs_impl = impl
base_bs_erf.run(title, black_scholes, pass_args=True, verbose=MPI.COMM_WORLD.Get_rank()==0)
54 changes: 45 additions & 9 deletions bs_erf_naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import numpy as np
invsqrt = lambda x: 1.0/sqrt(x)

def black_scholes ( nopt, price, strike, t, rate, vol, call, put ):

def black_scholes_args(nopt, price, strike, t, rate, vol, call, put):
mr = -rate
sig_sig_two = vol * vol * 2

for i in range(nopt):
P = float( price [i] )
S = strike [i]
T = t [i]
P = float(price[i])
S = strike[i]
T = t[i]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spaces here were on purpose to make it look pretty in WinMerge reports comparing implementations


a = log(P / S)
b = T * mr
Expand All @@ -27,7 +27,43 @@ def black_scholes ( nopt, price, strike, t, rate, vol, call, put ):

Se = exp(b) * S

call [i] = P * d1 - Se * d2
put [i] = call [i] - P + Se

base_bs_erf.run("Naive-loop", black_scholes, 4, 8, nparr=False, pass_args=True)
call[i] = P * d1 - Se * d2
put[i] = call[i] - P + Se
return call, put


def black_scholes(nopt, price, strike, t, rate, vol):
call = np.zeros(nopt, dtype=np.float64)
put = -np.ones(nopt, dtype=np.float64)
return black_scholes_args(nopt, price, strike, t, rate, vol, call, put)


def black_scholes_map(nopt, price, strike, t, rate, vol):
mr = -rate
sig_sig_two = vol * vol * 2
P = float(price)
S = strike
T = t

a = log(P / S)
b = T * mr

z = T * sig_sig_two
c = 0.25 * z
y = invsqrt(z)

w1 = (a - b + c) * y
w2 = (a - b - c) * y

d1 = 0.5 + 0.5 * erf(w1)
d2 = 0.5 + 0.5 * erf(w2)

Se = exp(b) * S

call = P * d1 - Se * d2
put = call - P + Se
return call, put


if __name__ == '__main__':
base_bs_erf.run(__file__, black_scholes_args, nparr=False, pass_args=True)
6 changes: 6 additions & 0 deletions bs_erf_naive_apply_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from bs_erf_apply import main
from multiprocessing.pool import Pool
from bs_erf_naive import black_scholes

if __name__ == '__main__':
main(__file__, black_scholes, Pool)
6 changes: 6 additions & 0 deletions bs_erf_naive_apply_threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from bs_erf_apply import main
from multiprocessing.pool import ThreadPool
from bs_erf_naive import black_scholes

if __name__ == '__main__':
main(__file__, black_scholes, ThreadPool)
6 changes: 6 additions & 0 deletions bs_erf_naive_map_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from bs_erf_map import main
from multiprocessing import Pool
from bs_erf_naive import black_scholes_map

if __name__ == '__main__':
main(__file__, black_scholes_map, Pool)
6 changes: 6 additions & 0 deletions bs_erf_naive_map_threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from bs_erf_map import main
from multiprocessing.pool import ThreadPool
from bs_erf_naive import black_scholes_map

if __name__ == '__main__':
main(__file__, black_scholes_map, ThreadPool)
5 changes: 5 additions & 0 deletions bs_erf_naive_mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from bs_erf_mpi import main
from bs_erf_naive import black_scholes

if __name__ == '__main__':
main(__file__, black_scholes)
5 changes: 5 additions & 0 deletions bs_erf_naive_threading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from bs_erf_naive import black_scholes_args
from bs_erf_threading import main

if __name__ == '__main__':
main(__file__, black_scholes_args)
Loading