Skip to content

Commit

Permalink
feat(kernel): Support Multiple Datasource (#435)
Browse files Browse the repository at this point in the history
  • Loading branch information
zccz14 authored Feb 18, 2024
1 parent 496f643 commit 810a852
Show file tree
Hide file tree
Showing 26 changed files with 277 additions and 78 deletions.
10 changes: 10 additions & 0 deletions common/changes/@yuants/agent/2024-02-18-05-31.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/agent",
"comment": "multi-datasource",
"type": "minor"
}
],
"packageName": "@yuants/agent"
}
10 changes: 10 additions & 0 deletions common/changes/@yuants/kernel/2024-02-18-05-31.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/kernel",
"comment": "multi-datasource",
"type": "minor"
}
],
"packageName": "@yuants/kernel"
}
2 changes: 1 addition & 1 deletion libraries/agent/etc/agent.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export const useEffect: (fn: () => void | (() => void), deps?: any[]) => void;

// @public
export const useExchange: () => {
getQuote: (product_id: string) => {
getQuote: (datasource_id: string, product_id: string) => {
ask: number;
bid: number;
};
Expand Down
6 changes: 3 additions & 3 deletions libraries/agent/src/hooks/useExchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import { useAgent } from '.';
* @public
*/
export const useExchange = (): {
getQuote: (product_id: string) => { ask: number; bid: number };
getQuote: (datasource_id: string, product_id: string) => { ask: number; bid: number };
getOrderById: (orderId: string) => IOrder | undefined;
listOrders: () => IOrder[];
submitOrder: (...orders: IOrder[]) => void;
cancelOrder: (...orderIds: string[]) => void;
} => {
const agent = useAgent();
return {
getQuote: (product_id: string) =>
agent.orderMatchingUnit.quoteDataUnit.mapProductIdToQuote[product_id] || { ask: NaN, bid: NaN },
getQuote: (datasource_id: string, product_id: string) =>
agent.orderMatchingUnit.quoteDataUnit.getQuote(datasource_id, product_id) || { ask: NaN, bid: NaN },
getOrderById: (orderId: string) => agent.orderMatchingUnit.getOrderById(orderId),

listOrders: () => {
Expand Down
2 changes: 1 addition & 1 deletion libraries/agent/src/hooks/useProduct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const useProduct = (datasource_id: string, product_id: string): IProduct
});
}, []);
return (
agent.productDataUnit.mapProductIdToProduct[product_id] || {
agent.productDataUnit.getProduct(datasource_id, product_id) || {
datasource_id,
product_id,
}
Expand Down
5 changes: 1 addition & 4 deletions libraries/agent/src/hooks/useTick.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ export const useTick = (account_id: string, datasource_id: string, product_id: s
useProduct(datasource_id, product_id);

useEffect(() => {
const unit = agent.kernel.units.find(
(unit): unit is RealtimeTickLoadingUnit => unit instanceof RealtimeTickLoadingUnit,
);
unit?.addTickTask(datasource_id, product_id, account_id);
agent.kernel.findUnit(RealtimeTickLoadingUnit)?.addTickTask(datasource_id, product_id, account_id);
}, []);

return agent.tickDataUnit.getTick(account_id, product_id);
Expand Down
47 changes: 37 additions & 10 deletions libraries/kernel/etc/kernel.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ import { PositionVariant } from '@yuants/protocol';
import { Subject } from 'rxjs';
import { Terminal } from '@yuants/protocol';

// @public (undocumented)
export class AccountDatasourceRelationUnit extends BasicUnit {
// (undocumented)
dump(): any;
// Warning: (ae-forgotten-export) The symbol "IAccountDatasourceRelation" needs to be exported by the entry point index.d.ts
//
// (undocumented)
list(): IAccountDatasourceRelation[];
// (undocumented)
restore(state: any): void;
// (undocumented)
updateRelation(relation: IAccountDatasourceRelation): void;
}

// @public (undocumented)
export class AccountInfoUnit extends BasicUnit {
constructor(kernel: Kernel, productDataUnit: ProductDataUnit, quoteDataUnit: QuoteDataUnit, historyOrderUnit: HistoryOrderUnit);
Expand Down Expand Up @@ -411,6 +425,10 @@ export class Kernel {
units: any[];
};
// (undocumented)
findUnit<T extends IKernelUnit>(Unit: new (...args: any[]) => T): T | undefined;
// (undocumented)
findUnits<T extends IKernelUnit>(Unit: new (...args: any[]) => T): T[];
// (undocumented)
id: string;
log: ((...params: any[]) => void) | undefined;
// (undocumented)
Expand Down Expand Up @@ -607,12 +625,18 @@ export class PortfolioSimulatorUnit extends BasicUnit {
export class ProductDataUnit extends BasicUnit {
// (undocumented)
dump(): {
mapProductIdToProduct: Record<string, IProduct>;
mapProductIdToProduct: Record<string, Record<string, IProduct>>;
};
// (undocumented)
mapProductIdToProduct: Record<string, IProduct>;
getProduct(datasource_id: string, product_id: string): IProduct | undefined;
// (undocumented)
listProducts(): IProduct[];
// (undocumented)
onInit(): void | Promise<void>;
// (undocumented)
restore(state: any): void;
// (undocumented)
updateProduct(product: IProduct): void;
}

// @public
Expand Down Expand Up @@ -648,18 +672,21 @@ export class ProductLoadingUnit extends BasicUnit {
export class QuoteDataUnit extends BasicUnit {
// (undocumented)
dump(): {
mapProductIdToQuote: Record<string, {
ask: number;
bid: number;
}>;
mapProductIdToQuote: Record<string, Record<string, IQuote>>;
mapDatasourceIdMapProductIdToAccountIds: Record<string, Record<string, Set<string>>>;
};
// Warning: (ae-forgotten-export) The symbol "IQuote" needs to be exported by the entry point index.d.ts
//
// (undocumented)
mapProductIdToQuote: Record<string, {
ask: number;
bid: number;
}>;
getQuote(datasource_id: string, product_id: string): IQuote | undefined;
// (undocumented)
listQuotes(): IQuote[];
// (undocumented)
onInit(): void | Promise<void>;
// (undocumented)
restore(state: any): void;
// (undocumented)
updateQuote(datasource_id: string, product_id: string, ask: number, bid: number): void;
}

// @public (undocumented)
Expand Down
8 changes: 8 additions & 0 deletions libraries/kernel/src/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,12 @@ export class Kernel {
unit.restore(state.units[idx]);
});
};

findUnit<T extends IKernelUnit>(Unit: new (...args: any[]) => T): T | undefined {
return this.units.find((v): v is T => v instanceof Unit);
}

findUnits<T extends IKernelUnit>(Unit: new (...args: any[]) => T) {
return this.units.filter((v): v is T => v instanceof Unit);
}
}
8 changes: 4 additions & 4 deletions libraries/kernel/src/scenes/AccountReplayScene.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export const AccountReplayScene = (
const productLoadingUnit = new ProductLoadingUnit(kernel, terminal, productDataUnit);
// Adhoc Unit: 根据品种加载交叉盘品种
new BasicUnit(kernel).onInit = async () => {
for (const product of Object.values(productDataUnit.mapProductIdToProduct)) {
for (const product of productDataUnit.listProducts()) {
if (product.quote_currency && currency && product.quote_currency !== currency) {
const [productA] = await lastValueFrom(
terminal
Expand All @@ -71,7 +71,7 @@ export const AccountReplayScene = (
),
);
if (productA) {
productDataUnit.mapProductIdToProduct[productA.product_id] = productA;
productDataUnit.updateProduct(productA);
}
const [productB] = await lastValueFrom(
terminal
Expand All @@ -89,14 +89,14 @@ export const AccountReplayScene = (
),
);
if (productB) {
productDataUnit.mapProductIdToProduct[productB.product_id] = productB;
productDataUnit.updateProduct(productB);
}
}
}
};
// Adhoc Unit: 根据品种加载行情数据
new BasicUnit(kernel).onInit = () => {
for (const product of Object.values(productDataUnit.mapProductIdToProduct)) {
for (const product of productDataUnit.listProducts()) {
periodLoadingUnit.periodTasks.push({
datasource_id: datasource_id ?? account_id,
product_id: product.product_id,
Expand Down
2 changes: 1 addition & 1 deletion libraries/kernel/src/scenes/OrderMergeReplayScene.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const OrderMergeReplayScene = (
const productDataUnit = new ProductDataUnit(kernel);
{
for (const product of products) {
productDataUnit.mapProductIdToProduct[product.product_id] = product;
productDataUnit.updateProduct(product);
}
}
const periodDataUnit = new PeriodDataUnit(kernel, quoteDataUnit);
Expand Down
33 changes: 33 additions & 0 deletions libraries/kernel/src/units/AccountDatasouceRelationUnit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { encodePath } from '@yuants/data-model';
import { BasicUnit } from './BasicUnit';

interface IAccountDatasourceRelation {
account_id: string;
datasource_id: string;
product_id: string;
}

/**
* @public
*/
export class AccountDatasourceRelationUnit extends BasicUnit {
private map: Record<string, IAccountDatasourceRelation> = {};

list(): IAccountDatasourceRelation[] {
return Object.values(this.map);
}

updateRelation(relation: IAccountDatasourceRelation) {
this.map[encodePath(relation.account_id, relation.datasource_id, relation.product_id)] = relation;
}

restore(state: any): void {
this.map = state.map;
}

dump(): any {
return {
map: this.map,
};
}
}
16 changes: 8 additions & 8 deletions libraries/kernel/src/units/AccountInfoUnit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export class AccountInfoUnit extends BasicUnit {
(this.mapAccountIdToBalance[accountId] || 0) + order.profit_correction;
}

const theProduct = this.productDataUnit.mapProductIdToProduct[order.product_id]!;
const theProduct = this.productDataUnit.getProduct(order.account_id, order.product_id)!;

// 假设所有的 order 都有 position_id
const variant =
Expand Down Expand Up @@ -141,7 +141,7 @@ export class AccountInfoUnit extends BasicUnit {
tradedVolume,
thePosition.variant,
theAccountInfo.money.currency,
(product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id],
(product_id) => this.quoteDataUnit.getQuote(accountId, product_id),
);
}
}
Expand All @@ -152,9 +152,9 @@ export class AccountInfoUnit extends BasicUnit {
.filter((pos) => pos.volume > 0) // 过滤掉空的头寸
.map((position): IPosition => {
const product_id = position.product_id;
const quote = this.quoteDataUnit.mapProductIdToQuote[product_id];
const product = this.productDataUnit.mapProductIdToProduct[product_id]!;
if (quote) {
const quote = this.quoteDataUnit.getQuote(accountId, product_id);
const product = this.productDataUnit.getProduct(accountId, product_id);
if (product && quote) {
const closable_price = position.variant === PositionVariant.LONG ? quote.bid : quote.ask;
const floating_profit = getProfit(
product,
Expand All @@ -163,7 +163,7 @@ export class AccountInfoUnit extends BasicUnit {
position.volume,
position.variant,
theAccountInfo.money.currency,
(product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id],
(product_id) => this.quoteDataUnit.getQuote(accountId, product_id),
);
const nextPosition = {
...position,
Expand All @@ -177,7 +177,7 @@ export class AccountInfoUnit extends BasicUnit {
});
// 维护账户保证金
const used = positions.reduce((acc, cur) => {
const product = this.productDataUnit.mapProductIdToProduct[cur.product_id];
const product = this.productDataUnit.getProduct(accountId, cur.product_id);
if (!product) {
return acc;
}
Expand All @@ -189,7 +189,7 @@ export class AccountInfoUnit extends BasicUnit {
cur.volume,
cur.variant,
theAccountInfo.money.currency,
(product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id],
(product_id) => this.quoteDataUnit.getQuote(accountId, product_id),
) /
(theAccountInfo.money.leverage ?? 1)
);
Expand Down
22 changes: 11 additions & 11 deletions libraries/kernel/src/units/AccountSimulatorUnit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export class AccountSimulatorUnit extends BasicUnit {
balance += order.profit_correction;
}

const theProduct = this.productDataUnit.mapProductIdToProduct[order.product_id]!;
const theProduct = this.productDataUnit.getProduct(order.account_id, order.product_id)!;

// 假设所有的 order 都有 position_id
const variant =
Expand All @@ -76,7 +76,7 @@ export class AccountSimulatorUnit extends BasicUnit {
thePosition.free_volume = thePosition.volume = order.volume;
} else {
// 开仓的时候,如果有头寸,就要更新头寸
const nextVolume = roundToStep(thePosition.volume + order.volume, theProduct.volume_step ?? 1);
const nextVolume = roundToStep(thePosition.volume + order.volume, theProduct?.volume_step ?? 1);
const nextPositionPrice =
(thePosition.position_price * thePosition.volume + order.traded_price! * order.volume) /
nextVolume;
Expand All @@ -96,9 +96,9 @@ export class AccountSimulatorUnit extends BasicUnit {
// 平仓的时候,如果有头寸,就要更新头寸
const tradedVolume = roundToStep(
Math.min(order.volume, thePosition.volume),
theProduct.volume_step ?? 1,
theProduct?.volume_step ?? 1,
);
const nextVolume = roundToStep(thePosition.volume - tradedVolume, theProduct.volume_step ?? 1);
const nextVolume = roundToStep(thePosition.volume - tradedVolume, theProduct?.volume_step ?? 1);
// 如果头寸已经平仓完了,就删除头寸
if (nextVolume === 0) {
delete this.mapPositionIdToPosition[order.position_id!];
Expand All @@ -113,7 +113,7 @@ export class AccountSimulatorUnit extends BasicUnit {
tradedVolume,
thePosition.variant,
this.accountInfo.money.currency,
(product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id],
(product_id) => this.quoteDataUnit.getQuote(this.accountInfo.account_id, product_id),
);
}
}
Expand All @@ -125,9 +125,9 @@ export class AccountSimulatorUnit extends BasicUnit {
.filter((pos) => pos.volume > 0) // 过滤掉空的头寸
.map((position): IPosition => {
const product_id = position.product_id;
const quote = this.quoteDataUnit.mapProductIdToQuote[product_id];
const product = this.productDataUnit.mapProductIdToProduct[product_id]!;
if (quote) {
const quote = this.quoteDataUnit.getQuote(this.accountInfo.account_id, product_id);
const product = this.productDataUnit.getProduct(this.accountInfo.account_id, product_id);
if (product && quote) {
const closable_price = position.variant === PositionVariant.LONG ? quote.bid : quote.ask;
const floating_profit = getProfit(
product,
Expand All @@ -136,7 +136,7 @@ export class AccountSimulatorUnit extends BasicUnit {
position.volume,
position.variant,
this.accountInfo.money.currency,
(product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id],
(product_id) => this.quoteDataUnit.getQuote(this.accountInfo.account_id, product_id),
);
const nextPosition = {
...position,
Expand All @@ -150,7 +150,7 @@ export class AccountSimulatorUnit extends BasicUnit {
});
// 维护账户保证金
const used = positions.reduce((acc, cur) => {
const product = this.productDataUnit.mapProductIdToProduct[cur.product_id];
const product = this.productDataUnit.getProduct(this.accountInfo.account_id, cur.product_id);
if (!product) {
return acc;
}
Expand All @@ -162,7 +162,7 @@ export class AccountSimulatorUnit extends BasicUnit {
cur.volume,
cur.variant,
this.accountInfo.money.currency,
(product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id],
(product_id) => this.quoteDataUnit.getQuote(this.accountInfo.account_id, product_id),
) /
(this.accountInfo.money.leverage ?? 1)
);
Expand Down
Loading

0 comments on commit 810a852

Please sign in to comment.