Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V0.9.32 更新一批代码 #172

Merged
merged 5 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name: Python package

on:
push:
branches: [ master, V0.9.31 ]
branches: [ master, V0.9.32 ]
pull_request:
branches: [ master ]

Expand Down
4 changes: 2 additions & 2 deletions czsc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@
normalize_feature,
)

__version__ = "0.9.31"
__version__ = "0.9.32"
__author__ = "zengbin93"
__email__ = "[email protected]"
__date__ = "20231007"
__date__ = "20231013"



Expand Down
199 changes: 198 additions & 1 deletion czsc/connectors/qmt_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from typing import List
from tqdm import tqdm
from loguru import logger
from deprecated import deprecated
from datetime import datetime, timedelta
from czsc.objects import Freq, RawBar
from czsc.fsa.im import IM
Expand Down Expand Up @@ -374,6 +373,204 @@ def on_account_status(self, status):
self.push_message(msg, msg_type='text')


def query_stock_positions(xtt: XtQuantTrader, acc: StockAccount):
"""查询股票市场的持仓单

http://docs.thinktrader.net/pages/ee0e9b/#%E6%8C%81%E4%BB%93%E6%9F%A5%E8%AF%A2
http://docs.thinktrader.net/pages/198696/#%E6%8C%81%E4%BB%93xtposition
"""
res = xtt.query_stock_positions(acc)
res = {x.stock_code: x for x in res} if len(res) > 0 else {}
return res


def query_today_trades(xtt: XtQuantTrader, acc: StockAccount):
"""查询当日成交

http://docs.thinktrader.net/pages/198696/#%E6%88%90%E4%BA%A4xttrade
"""
trades = xtt.query_stock_trades(acc)
res = [{'品种': x.stock_code, '均价': x.traded_price, "方向": "买入" if x.order_type == 23 else "卖出",
'数量': x.traded_volume, '金额': x.traded_amount,
'时间': time.strftime("%H:%M:%S", time.localtime(x.traded_time))} for x in trades]
return res


def cancel_timeout_orders(xtt: XtQuantTrader, acc: StockAccount, minutes=30):
"""撤销超时的委托单

http://docs.thinktrader.net/pages/ee0e9b/#%E8%82%A1%E7%A5%A8%E5%90%8C%E6%AD%A5%E6%92%A4%E5%8D%95
http://docs.thinktrader.net/pages/ee0e9b/#%E5%A7%94%E6%89%98%E6%9F%A5%E8%AF%A2
http://docs.thinktrader.net/pages/198696/#%E5%A7%94%E6%89%98xtorder

:param minutes: 超时时间,单位分钟
:return:
"""
orders = xtt.query_stock_orders(acc, cancelable_only=True)
for o in orders:
if datetime.fromtimestamp(o.order_time) < datetime.now() - timedelta(minutes=minutes):
xtt.cancel_order_stock(acc, o.order_id)


def is_order_exist(xtt: XtQuantTrader, acc: StockAccount, symbol, order_type, volume=None):
"""判断是否存在相同的委托单

http://docs.thinktrader.net/pages/ee0e9b/#%E8%82%A1%E7%A5%A8%E5%90%8C%E6%AD%A5%E6%92%A4%E5%8D%95
http://docs.thinktrader.net/pages/ee0e9b/#%E5%A7%94%E6%89%98%E6%9F%A5%E8%AF%A2
http://docs.thinktrader.net/pages/198696/#%E5%A7%94%E6%89%98xtorder

"""
orders = xtt.query_stock_orders(acc, cancelable_only=False)
for o in orders:
if o.stock_code == symbol and o.order_type == order_type:
if not volume or o.order_volume == volume:
return True
return False


def is_allow_open(xtt: XtQuantTrader, acc: StockAccount, symbol, price, **kwargs):
"""判断是否允许开仓

http://docs.thinktrader.net/pages/198696/#%E8%B5%84%E4%BA%A7xtasset

:param symbol: 股票代码
:param price: 股票现价
:return: True 允许开仓,False 不允许开仓
"""
symbol_max_pos = kwargs.get('max_pos', 0) # 最大持仓数量

# 如果 symbol_max_pos 为 0,不允许开仓
if symbol_max_pos <= 0:
return False

# 如果 symbol 在禁止交易的列表中,不允许开仓
if symbol in kwargs.get('forbidden_symbols', []):
return False

# 如果 未成交的开仓委托单 存在,不允许开仓
if is_order_exist(xtt, acc, symbol, order_type=23):
logger.warning(f"存在未成交的开仓委托单,symbol={symbol}")
return False

# 如果已经有持仓,不允许开仓
if query_stock_positions(xtt, acc).get(symbol, None):
return False

# 如果资金不足,不允许开仓
assets = xtt.query_stock_asset(acc)
if assets.cash < price * 120:
logger.warning(f"资金不足,无法开仓,symbol={symbol}")
return False

return True


def is_allow_exit(xtt: XtQuantTrader, acc: StockAccount, symbol, **kwargs):
"""判断是否允许平仓

:param symbol: 股票代码
:return: True 允许开仓,False 不允许开仓
"""
# symbol 在禁止交易的列表中,不允许平仓
if symbol in kwargs.get('forbidden_symbols', []):
return False

# 没有持仓 或 可用数量为 0,不允许平仓
pos = query_stock_positions(xtt, acc).get(symbol, None)
if not pos or pos.can_use_volume <= 0:
return False

# 未成交的平仓委托单 存在,不允许继续平仓
if is_order_exist(xtt, acc, symbol, order_type=24):
logger.warning(f"存在未成交的平仓委托单,symbol={symbol}")
return False

return True


def send_stock_order(xtt: XtQuantTrader, acc: StockAccount, **kwargs):
"""股票市场交易下单

股票同步报单 http://docs.thinktrader.net/pages/ee0e9b/#%E8%82%A1%E7%A5%A8%E5%90%8C%E6%AD%A5%E6%8A%A5%E5%8D%95
委托类型(order_type) http://docs.thinktrader.net/pages/198696/#%E5%A7%94%E6%89%98%E7%B1%BB%E5%9E%8B-order-type
报价类型(price_type) http://docs.thinktrader.net/pages/198696/#%E6%8A%A5%E4%BB%B7%E7%B1%BB%E5%9E%8B-price-type

stock_code: 证券代码, 例如"600000.SH"
order_type: 委托类型, 23:买, 24:卖
order_volume: 委托数量, 股票以'股'为单位, 债券以'张'为单位,ETF以'份'为单位;数量必须是100的整数倍
price_type: 报价类型, 详见帮助手册
xtconstant.LATEST_PRICE 5 最新价
xtconstant.FIX_PRICE 11 限价
price: 报价价格, 如果price_type为限价, 那price为指定的价格, 否则填0
strategy_name: 策略名称
order_remark: 委托备注

:return: 返回下单请求序号, 成功委托后的下单请求序号为大于0的正整数, 如果为-1表示委托失败
"""
stock_code = kwargs.get('stock_code')
order_type = kwargs.get('order_type')
order_volume = kwargs.get('order_volume') # 委托数量, 股票以'股'为单位, 债券以'张'为单位
price_type = kwargs.get('price_type', 5)
price = kwargs.get('price', 0)
strategy_name = kwargs.get('strategy_name', "程序下单")
order_remark = kwargs.get('order_remark', "程序下单")

if not xtt.connected:
xtt.start()
xtt.connect()

order_volume = max(order_volume // 100 * 100, 0) # 股票市场只允许做多 100 的整数倍
assert xtt.connected, "交易服务器连接断开"
_id = xtt.order_stock(acc, stock_code, order_type, int(order_volume), price_type, price, strategy_name, order_remark)
return _id


def order_stock_target(xtt: XtQuantTrader, acc: StockAccount, symbol, target, **kwargs):
"""下单调整至目标仓位

:param xtt: XtQuantTrader, QMT 交易接口
:param acc: StockAccount, 账户
:param symbol: str, 股票代码
:param target: int, 目标仓位, 单位:股;正数表示持有多头仓位,负数表示持有空头仓位
:param kwargs: dict, 其他参数

- price_type: int, 报价类型, 详见帮助手册
xtconstant.LATEST_PRICE 5 最新价
xtconstant.FIX_PRICE 11 限价
- price: float, 报价价格, 如果price_type为限价, 那price为指定的价格, 否则填0

:return:
"""
# 查询持仓
pos = query_stock_positions(xtt, acc).get(symbol, None)
current = pos.volume if pos else 0

logger.info(f"当前持仓:{current},目标仓位:{target}")
if current == target:
return

price_type = kwargs.get('price_type', 5)
price = kwargs.get('price', 0)

# 如果目标小于当前,平仓
if target < current:
delta = min(current - target, pos.can_use_volume if pos else current)
logger.info(f"{symbol}平仓,目标仓位:{target},当前仓位:{current},平仓数量:{delta}")
if delta != 0:
send_stock_order(xtt, acc, stock_code=symbol, order_type=24,
order_volume=delta, price_type=price_type, price=price)
return

# 如果目标大于当前,开仓
if target > current:
delta = target - current
logger.info(f"{symbol}开仓,目标仓位:{target},当前仓位:{current},开仓数量:{delta}")
if delta != 0:
send_stock_order(xtt, acc, stock_code=symbol, order_type=23,
order_volume=delta, price_type=price_type, price=price)
return


class QmtTradeManager:
"""QMT交易管理器(这是一个案例性质的存在,真正实盘的时候请参考这个,根据自己的逻辑重新实现)

Expand Down
50 changes: 29 additions & 21 deletions czsc/traders/rwc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class RedisWeightsClient:
"""策略持仓权重收发客户端"""

version = "V231012"
version = "V231014"

def __init__(self, strategy_name, redis_url, **kwargs):
"""
Expand Down Expand Up @@ -81,22 +81,22 @@ def metadata(self):
key = f'{self.key_prefix}:META:{self.strategy_name}'
return self.r.hgetall(key)

@property
def last_time(self):
"""获取策略最近一次发布信号的时间"""
keys = self.get_keys(f'{self.key_prefix}:{self.strategy_name}:*:LAST')
dt = None
for key in keys:
dt_ = pd.to_datetime(str(self.r.hget(key, 'dt')))
dt = dt_ if not dt else max(dt, dt_)
return dt

def get_last_time(self, symbol):
"""获取指定品种上策略最近一次发布信号的时间"""
key = f'{self.key_prefix}:{self.strategy_name}:{symbol}:LAST'
if not self.r.exists(key):
return None
return pd.to_datetime(str(self.r.hget(key, 'dt')))
def get_last_times(self, symbols=None):
"""获取所有品种上策略最近一次发布信号的时间

:param symbols: list, 品种列表, 默认为None, 即获取所有品种
:return: dict, {symbol: datetime},如{'SFIF9001': datetime(2021, 9, 24, 15, 19, 0)}
"""
if isinstance(symbols, str):
row = self.r.hgetall(f'{self.key_prefix}:{self.strategy_name}:{symbols}:LAST')
return pd.to_datetime(row['dt']) if row else None # type: ignore

symbols = symbols if symbols else self.get_symbols()
with self.r.pipeline() as pipe:
for symbol in symbols:
pipe.hgetall(f'{self.key_prefix}:{self.strategy_name}:{symbol}:LAST')
rows = pipe.execute()
return {x['symbol']: pd.to_datetime(x['dt']) for x in rows}

def publish(self, symbol, dt, weight, price=0, ref=None, overwrite=False):
"""发布单个策略持仓权重
Expand All @@ -113,7 +113,7 @@ def publish(self, symbol, dt, weight, price=0, ref=None, overwrite=False):
dt = pd.to_datetime(dt)

if not overwrite:
last_dt = self.get_last_time(symbol)
last_dt = self.get_last_times(symbol)
if last_dt is not None and dt <= last_dt:
logger.warning(f"不允许重复写入,已过滤 {symbol} {dt} 的重复信号")
return 0
Expand Down Expand Up @@ -143,9 +143,10 @@ def publish_dataframe(self, df, overwrite=False, batch_size=10000):

if not overwrite:
raw_count = len(df)
_time = self.get_last_times()
_data = []
for symbol, dfg in df.groupby('symbol'):
last_dt = self.get_last_time(symbol)
last_dt = _time.get(symbol)
if last_dt is not None:
dfg = dfg[dfg['dt'] > last_dt]
_data.append(dfg)
Expand Down Expand Up @@ -250,8 +251,13 @@ def get_symbols(self):
symbols = {x.split(":")[2] for x in keys}
return list(symbols)

def get_last_weights(self, symbols=None):
"""获取最近的持仓权重"""
def get_last_weights(self, symbols=None, ignore_zero=True):
"""获取最近的持仓权重

:param symbols: list, 品种列表
:param ignore_zero: boolean, 是否忽略权重为0的品种
:return: pd.DataFrame
"""
symbols = symbols if symbols else self.get_symbols()
with self.r.pipeline() as pipe:
for symbol in symbols:
Expand All @@ -261,6 +267,8 @@ def get_last_weights(self, symbols=None):
dfw = pd.DataFrame(rows)
dfw['weight'] = dfw['weight'].astype(float)
dfw['dt'] = pd.to_datetime(dfw['dt'])
if ignore_zero:
dfw = dfw[dfw['weight'] != 0].copy().reset_index(drop=True)
return dfw

def get_hist_weights(self, symbol, sdt, edt) -> pd.DataFrame:
Expand Down
Loading
Loading