-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate_portfolios.py
188 lines (155 loc) · 6.65 KB
/
create_portfolios.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import datetime
from enum import Enum
from faker import Faker
import psycopg
import random
import string
class Field(Enum):
# portfolio info
id = 0
acct_num = 1
manager_id= 2
opened_on = 3
cash = 4
strategy = 5
eligible = 6
created_on = 7
created_by = 8
modified_on = 9
modified_by = 10
# price info
asset_class = 0
symbol = 1
close = 2
class Create_portfolios:
def __init__(self, args: dict):
# args is a dict of string passed with the --args flag
# user passed a yaml/json, in python that's a dict object
# you can arbitrarely add any variables you want
self.counter: int = 0
self.fake = Faker()
# the setup() function is executed only once
# when a new executing thread is started.
# Also, the function is a vector to receive the excuting threads's unique id and the total thread count
def setup(self, conn: psycopg.Connection, id: int, total_thread_count: int):
self.id = id
with conn.cursor() as cur:
print(
f"My thread ID is {id}. The total count of threads is {total_thread_count}"
)
print(cur.execute(f"select version()").fetchone()[0])
# the loop() function returns a list of functions
# that dbworkload will execute, sequentially.
# Once every func has been executed, loop() is re-evaluated.
# This process continues until dbworkload exits.
def loop(self):
# print(f"id: {self.id} and counter: {self.counter} LOOP called")
return [self.get_manager, self.create_portfolio, self.add_securities]
def get_manager(self, conn: psycopg.Connection):
with conn.cursor() as cur:
# query how many portfolio managers currently exist
sql = """
select count("Id") from "PortfolioManagers";
"""
num_managers = cur.execute(sql).fetchone()[0]
# create a new portfolio manager 0.1% of the time
if num_managers == 0 or 1 >= random.randint(1, 1000):
ins = """
insert into "PortfolioManagers" ("Name")
values ('{0}') returning *;
""".format(self.fake.name())
# and save the new portfolio manager for further processing
try:
cur.execute(ins)
except psycopg.Error as e:
print(f"Ignoring '{e}' error")
num_managers += 1
# and choose a random portfolio manager
sel = """
select * from "PortfolioManagers"
offset cast(floor(random() * {0}) as int)
limit 1;
""".format(num_managers)
self.manager = cur.execute(sel).fetchone()
def create_portfolio(self, conn: psycopg.Connection):
# if get portfolio manager failed then exit
if (self.manager is None or
len(self.manager) < 6 or
self.manager[Field.id.value] is None
):
return
self.counter += 1
self.portfolio = None
# open a new portfolio with a quasi-unique account number
acct_num = ''.join(random.choice(string.ascii_uppercase) for _ in range(4))
acct_num += '-' + str(abs(hash(datetime.datetime.now())))[:6]
acct_num += '-' + f'{self.id % 100:02}'
acct_num += '-' + f'{self.counter % 10000:04}'
with conn.cursor() as cur:
# and a random open date chosen from available market prices
sql = """
select "Date" from market."Prices"
offset (select cast(floor(random() *
(select cast(count(*) as float) from market."Prices")) as int))
limit 1;
"""
open_date = cur.execute(sql).fetchone()[0]
# and a random initial cash injection, strategy and eligibility
cash = random.randint(10000, 1000000)
strategy = random.randint(0, 4)
eligible = random.randint(0, 100) <= 80
ins = """
insert into "Portfolios" ("AccountNum", "ManagerId", "OpenedOn", "Cash", "Strategy", "Eligible")
values (%s, %s, %s, %s, %s, %s) returning *;
"""
params = (acct_num, self.manager[Field.id.value], open_date, cash, strategy, eligible)
# and save the new portfolio for further processing
self.portfolio = cur.execute(ins, params).fetchone()
# print(f"{self.counter}: created portfolio {self.portfolio}")
def add_securities(self, conn: psycopg.Connection):
# if the create portfolio failed then exit
if (self.portfolio is None or
len(self.portfolio) < 11 or
self.portfolio[Field.id.value] is None
):
return
with conn.cursor() as cur:
# get a list of securites available in the market on the portfolio open date
opened_on = self.portfolio[Field.opened_on.value]
open_day = opened_on.strftime('%Y-%m-%d')
next_day = (opened_on + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
sql = """
select "AssetClass", "Symbol", "Close" from market."Prices"
where "Date" between date '{0}' and date '{1}'
""".format(open_day, next_day)
securities = cur.execute(sql).fetchall()
# and pick a random number of securities
random.shuffle(securities)
securities = securities[:random.randint(1, len(securities))]
# and then create empty positions for the portfolio
positions = len(securities)
avail_alloc = 100
data = []
for security in securities:
alloc = random.randint(0, int(avail_alloc / positions))
positions -= 1
avail_alloc -= alloc
data += [
security[Field.asset_class.value],
security[Field.symbol.value],
0,
security[Field.close.value],
alloc,
self.portfolio[Field.id.value]
]
# and execute a single multi-value insert to store the positions
ins = """
insert into "Positions" (
"AssetClass", "Symbol", "Quantity",
"Price", "Allocation", "PortfolioId"
)
"""
record_cnt = len(securities)
fields = ','.join("%s" for i in range(int(len(data) / record_cnt)))
values = ','.join(f"({fields})" for i in range(record_cnt))
cur.execute(f"{ins} VALUES {values};", tuple(data))