-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
✨ feat: #58 cache more frame type bras and to real-time calc with lua
1. add lua script and wrappers under scripts 2. load_lua_script during app.init 3. rebuild unclosed bars during app.init 4. realtime calc unclosed bars after 1m bars is fetched and cached
- Loading branch information
aaron yang
committed
Jul 24, 2022
1 parent
30e40b4
commit c82fd07
Showing
16 changed files
with
569 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
# 数据存储 | ||
Omega 2.0将数据主要存储在时间序列数据库(influxdb)中。上 | ||
# 数据同步 | ||
# 数据校准 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import logging | ||
|
||
import arrow | ||
import numpy as np | ||
from coretypes import FrameType | ||
from omicron import cache, tf | ||
from omicron.models.security import Security | ||
from omicron.models.stock import Stock | ||
from omicron.notify.dingtalk import DingTalkMessage | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
async def _rebuild_min_level_unclosed_bars(): | ||
"""根据缓存中的分钟线,重建当日已收盘或者未收盘的分钟级别及日线级别数据""" | ||
end = tf.floor(arrow.now().naive, FrameType.MIN1) | ||
keys = await cache.security.keys("bars:1m:*") | ||
|
||
errors = 0 | ||
for key in keys: | ||
try: | ||
sec = key.split(":")[2] | ||
bars = await Stock._get_cached_bars(sec, end, 240, FrameType.MIN1) | ||
except Exception as e: | ||
logger.exception(e) | ||
logger.warning("failed to get cached bars for %s", sec) | ||
errors += 1 | ||
continue | ||
|
||
try: | ||
for frame_type in tf.minute_level_frames[1:]: | ||
resampled = Stock.resample(bars, FrameType.MIN1, frame_type) | ||
if tf.is_bar_closed(resampled[-1]["frame"], frame_type): | ||
await Stock.cache_bars(sec, frame_type, resampled) | ||
else: | ||
await Stock.cache_bars(sec, frame_type, resampled[:-1]) | ||
await Stock.cache_unclosed_bars(sec, frame_type, resampled[-1:]) | ||
|
||
# 重建日线数据 | ||
resampled = Stock.resample(bars, FrameType.MIN1, FrameType.DAY) | ||
await Stock.cache_unclosed_bars(sec, FrameType.DAY, resampled) | ||
except Exception as e: | ||
logger.exception(e) | ||
logger.warning( | ||
"failed to build unclosed bar for %s, frame type is %s", sec, frame_type | ||
) | ||
errors += 1 | ||
|
||
if errors > 0: | ||
DingTalkMessage.text(f"重建分钟级缓存数据时,出现{errors}个错误。") | ||
|
||
|
||
async def _rebuild_day_level_unclosed_bars(): | ||
"""重建当期未收盘的周线、月线 | ||
!!!Info: | ||
最终我们需要实时更新年线和季线。目前数据库还没有同步这两种k线。 | ||
""" | ||
codes = await Security.select().eval() | ||
end = arrow.now().date() | ||
# just to cover one month's day bars at most | ||
n = 30 | ||
start = tf.day_shift(end, -n) | ||
|
||
errors = 0 | ||
for code in codes: | ||
try: | ||
bars = await Stock._get_persisted_bars( | ||
code, FrameType.DAY, begin=start, end=end | ||
) | ||
except Exception as e: | ||
logger.exception(e) | ||
logger.warning("failed to get persisted bars for %s from %s to %s", code, start, end) | ||
errors += 1 | ||
continue | ||
|
||
try: | ||
unclosed_day = await Stock._get_cached_day_bar(code) | ||
bars = np.concatenate([bars, unclosed_day]) | ||
|
||
week = Stock.resample(bars, FrameType.DAY, FrameType.WEEK) | ||
await Stock.cache_unclosed_bars(code, FrameType.WEEK, week[-1:]) | ||
|
||
month = Stock.resample(bars, FrameType.DAY, FrameType.MONTH) | ||
await Stock.cache_unclosed_bars(code, FrameType.MONTH, month[-1:]) | ||
except Exception as e: | ||
logger.exception(e) | ||
logger.warning( | ||
"failed to build unclosed bar for %s, got bars %s", code, len(bars) | ||
) | ||
errors += 1 | ||
|
||
if errors > 0: | ||
DingTalkMessage.text(f"重建日线级别缓存数据时,出现{errors}个错误。") | ||
|
||
|
||
async def rebuild_unclosed_bars(): | ||
"""在omega启动时重建未收盘数据 | ||
后续未收盘数据的更新,将在每个分钟线同步完成后,调用lua脚本进行。 | ||
""" | ||
await _rebuild_min_level_unclosed_bars() | ||
await _rebuild_day_level_unclosed_bars() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import logging | ||
import os | ||
|
||
from coretypes import Frame, FrameType | ||
from omicron import cache, tf | ||
from omicron.notify.dingtalk import DingTalkMessage | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
async def load_lua_script(): | ||
"""加载lua脚本到redis中""" | ||
dir_ = os.path.dirname(os.path.abspath(__file__)) | ||
for file in os.listdir(dir_): | ||
if not file.endswith(".lua"): | ||
continue | ||
|
||
path = os.path.join(dir_, file) | ||
with open(path, "r", encoding="utf-8") as f: | ||
content = f.read() | ||
|
||
r = await cache.sys.execute("FUNCTION", "LOAD", "REPLACE", content) | ||
print(r) | ||
|
||
|
||
async def update_unclosed_bar(frame_type: FrameType, source_min: Frame): | ||
"""wraps the cognominal lua script function | ||
Args: | ||
frame_type: which frame type to be updated/merged | ||
source_min: the minute bar to be merged from | ||
""" | ||
source = tf.time2int(source_min) | ||
try: | ||
await cache.security.execute( | ||
"fcall", "update_unclosed", 0, frame_type.value, source | ||
) | ||
except Exception as e: | ||
msg = f"实时合并{frame_type}未收盘行情数据错误:{source_min}" | ||
logger.exception(e) | ||
logging.warning(msg) | ||
DingTalkMessage.text(msg) | ||
|
||
|
||
async def close_frame(frame_type: FrameType, frame: Frame): | ||
"""wraps the cognominal lua script function | ||
Args: | ||
frame_type: which frame type to be closed | ||
frame: the closed frame | ||
""" | ||
dst = ( | ||
tf.date2int(frame) if frame_type in tf.day_level_frames else tf.time2int(frame) | ||
) | ||
|
||
try: | ||
await cache.security.execute("fcall", "close_frame", 0, frame_type.value, dst) | ||
except Exception as e: | ||
msg = f"缓存收盘{frame_type}数据失败: {frame}" | ||
logger.exception(e) | ||
logger.warning(msg) | ||
DingTalkMessage.text(msg) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
#!lua name=omega | ||
|
||
local function round2(num) | ||
return math.floor(num * 100 + 0.5) / 100 | ||
end | ||
|
||
local function newsplit(delimiter, str) | ||
assert(type(delimiter) == "string") | ||
assert(#delimiter > 0, "Must provide non empty delimiter") | ||
|
||
-- Add escape characters if delimiter requires it | ||
-- delimiter = delimiter:gsub("[%(%)%.%%%+%-%*%?%[%]%^%$]", "%%%0") | ||
|
||
local start_index = 1 | ||
local result = {} | ||
|
||
while true do | ||
local delimiter_index, _ = str:find(delimiter, start_index) | ||
|
||
if delimiter_index == nil then | ||
table.insert(result, str:sub(start_index)) | ||
break | ||
end | ||
|
||
table.insert(result, str:sub(start_index, delimiter_index - 1)) | ||
|
||
start_index = delimiter_index + 1 | ||
end | ||
|
||
return result | ||
end | ||
|
||
local function close_frame(keys_, args) | ||
--local function close_frame(frame_type, frame) | ||
-- close the frame, write unclosed_5m hash to bars:5m:{code} hash. | ||
local frame_type, frame = unpack(args) | ||
local hm = redis.call('hgetall', 'bars:' .. frame_type .. ':unclosed') | ||
|
||
for i = 1, #hm, 2 do | ||
local code = hm[i] | ||
local bar = hm[i + 1] | ||
redis.call('hset', 'bars:' .. frame_type .. ':' .. code, frame, bar) | ||
end | ||
|
||
redis.call('del', 'bars:' .. frame_type .. ':unclosed') | ||
end | ||
|
||
local function decode_bar(bars) | ||
-- 将string表示的bar解码成为正确类型的OHLC,但对frame仍保持为字符串 | ||
local frame, open, high, low, close, volume, amount, factor = unpack(newsplit(',', bars)) | ||
|
||
return frame, round2(tonumber(open)), round2(tonumber(high)), round2(tonumber(low)), round2(tonumber(close)), tonumber(volume), tonumber(amount), tonumber(factor) | ||
end | ||
|
||
local function update_unclosed(keys_, args) | ||
--local function update_unclosed(frame_type, min_frame) | ||
-- merge bars:{frame_type.value}:unclosed with bars:1m:{code} hash. | ||
-- args are: frame_type(str), min_frame(int, minute frame) | ||
|
||
local frame_type, min_frame = unpack(args) | ||
local unclosed_key = 'bars:' .. frame_type .. ':unclosed' | ||
|
||
-- bars:1m:* should contains NO bars:1m:unclosed | ||
local keys = redis.call('keys', 'bars:1m:*') | ||
|
||
for _, key_ in ipairs(keys) do | ||
local code = key_:match('bars:1m:(.*)') | ||
|
||
-- get 1m bar to merge from | ||
local mbar = redis.call('hget', key_, min_frame) | ||
if mbar then | ||
local t2, o2, h2, l2, c2, v2, a2, f2 = decode_bar(mbar) | ||
|
||
-- get unclosed bar and do the merge | ||
local unclosed = redis.call('hget', unclosed_key, code) | ||
|
||
local t, opn, high, low, close, volume, amount, factor = '', o2, h2, l2, c2, v2, a2, f2 | ||
if unclosed then | ||
local _, o1, h1, l1, c1, v1, a1, f1 = decode_bar(unclosed) | ||
opn = o1 | ||
high = math.max(h1, h2) | ||
low = math.min(l1, l2) | ||
close = c2 | ||
volume = v1 + v2 | ||
amount = a1 + a2 | ||
factor = f2 | ||
end | ||
|
||
-- save unclosed bar | ||
local bar = min_frame .. ',' .. opn .. ',' .. high .. ',' .. low .. ',' .. close .. ',' .. volume .. ',' .. amount .. ',' .. factor | ||
redis.call('hset', unclosed_key, code, bar) | ||
end | ||
end | ||
end | ||
|
||
-- update_unclosed('5m', 202207180935, 202207180931) | ||
-- update_unclosed('5m', 202207180935, 202207180932) | ||
-- update_unclosed('5m', 202207180935, 202207180933) | ||
-- update_unclosed('5m', 202207180935, 202207180935) | ||
-- update_unclosed('5m', 202207180935, 202207180936) | ||
|
||
|
||
-- close_frame('5m', "2022020935") | ||
|
||
redis.register_function( | ||
'close_frame', | ||
close_frame | ||
) | ||
redis.register_function( | ||
'update_unclosed', | ||
update_unclosed | ||
) |
Oops, something went wrong.