-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample_sub.py
executable file
·95 lines (79 loc) · 3.07 KB
/
example_sub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python
import asyncio
import json
import os
import sys
import argh
import saxo_openapi.contrib.session as session
import websockets
from saxo_openapi import API
from saxo_openapi.contrib.ws import stream
from saxo_openapi.endpoints import (
accounthistory,
apirequest,
chart,
decorators,
eventnotificationservices,
portfolio,
referencedata,
rootservices,
trading,
valueadd,
)
_TOKEN_FILENAME = os.path.expanduser("~/.cred/saxo/saxo_token.txt")
def get_token(filename=_TOKEN_FILENAME):
return open(filename).read().strip()
async def echo(ContextId, token):
hdrs = {
"Authorization": "Bearer {}".format(token),
}
URL = "wss://streaming.saxotrader.com/sim/openapi/streamingws/connect?" + "contextId={ContextId}".format(
ContextId=ContextId
)
async with websockets.connect(URL, extra_headers=hdrs) as websocket:
async for message in websocket:
print(stream.decode_ws_msg(message))
def read_sub(context_id):
token = get_token()
asyncio.get_event_loop().run_until_complete(echo(ContextId=context_id, token=token))
def create_price_sub(context_id, *instruments):
"""fetch instrument data by the name of the instrument and extract the Uic (Identifier)
and use that to subscribe for prices.
Use the name of the instrument as reference.
"""
token = get_token()
client = API(access_token=token)
account_info = session.account_info(client=client)
# body template for price subscription
body = {"Arguments": {"Uic": "", "AssetType": "FxSpot"}, "ContextId": "", "ReferenceId": ""}
body['ContextId'] = context_id
for instrument in instruments:
params = {'AccountKey': account_info.AccountKey, 'AssetTypes': 'FxSpot', 'Keywords': instrument}
# create the request to fetch Instrument info
req = referencedata.instruments.Instruments(params=params)
rv = client.request(req)
print(rv)
rv = [x for x in rv['Data'] if x['Symbol'] == instrument]
assert len(rv) == 1
rv = rv[0]
body['Arguments'].update({'Uic': rv['Identifier']})
body.update({"ReferenceId": instrument})
print(json.dumps(body, indent=2))
# create the request to fetch Instrument info
req = trading.prices.CreatePriceSubscription(data=body)
client.request(req)
status = "succesful" if req.status_code == req.expected_status else "failed"
print(f"Subscription for instrument: {instrument} {status}")
def delete_price_sub(context_id, *, ref_id=None):
token = get_token()
client = API(access_token=token)
if ref_id is None:
# delete whole thing
req = trading.prices.PriceSubscriptionRemoveByTag(context_id)
else:
req = trading.prices.PriceSubscriptionRemove(context_id, ref_id)
client.request(req)
status = "succesful" if req.status_code == req.expected_status else "failed"
print(f"price sub delete: {context_id} {ref_id} {status}")
if __name__ == "__main__":
argh.dispatch_commands([read_sub, create_price_sub, delete_price_sub])