From 810a852874b953233990a81d0bbf314a4be75c63 Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Sun, 18 Feb 2024 13:37:09 +0800 Subject: [PATCH] feat(kernel): Support Multiple Datasource (#435) --- .../@yuants/agent/2024-02-18-05-31.json | 10 +++ .../@yuants/kernel/2024-02-18-05-31.json | 10 +++ libraries/agent/etc/agent.api.md | 2 +- libraries/agent/src/hooks/useExchange.ts | 6 +- libraries/agent/src/hooks/useProduct.ts | 2 +- libraries/agent/src/hooks/useTick.ts | 5 +- libraries/kernel/etc/kernel.api.md | 47 +++++++++--- libraries/kernel/src/kernel.ts | 8 ++ .../kernel/src/scenes/AccountReplayScene.ts | 8 +- .../src/scenes/OrderMergeReplayScene.ts | 2 +- .../src/units/AccountDatasouceRelationUnit.ts | 33 ++++++++ libraries/kernel/src/units/AccountInfoUnit.ts | 16 ++-- .../kernel/src/units/AccountSimulatorUnit.ts | 22 +++--- .../ActivePortfolioOptimizeSimulatorUnit.ts | 8 +- .../src/units/HistoryPeriodLoadingUnit.ts | 4 +- .../kernel/src/units/OrderMatchingUnit.ts | 4 +- libraries/kernel/src/units/PeriodDataUnit.ts | 10 ++- .../src/units/PortfolioSimulatorUnit.ts | 8 +- libraries/kernel/src/units/ProductDataUnit.ts | 33 +++++++- .../kernel/src/units/ProductLoadingUnit.ts | 2 +- libraries/kernel/src/units/QuoteDataUnit.ts | 75 ++++++++++++++++++- .../src/units/RealtimePeriodLoadingUnit.ts | 4 +- .../src/units/RealtimeTickLoadingUnit.ts | 9 ++- libraries/kernel/src/units/index.ts | 1 + .../src/units/metrics/QuoteMetricsUnit.ts | 8 +- .../StaticFileServerPeriodLoadingUnit.ts | 18 ++--- 26 files changed, 277 insertions(+), 78 deletions(-) create mode 100644 common/changes/@yuants/agent/2024-02-18-05-31.json create mode 100644 common/changes/@yuants/kernel/2024-02-18-05-31.json create mode 100644 libraries/kernel/src/units/AccountDatasouceRelationUnit.ts diff --git a/common/changes/@yuants/agent/2024-02-18-05-31.json b/common/changes/@yuants/agent/2024-02-18-05-31.json new file mode 100644 index 000000000..e58821046 --- /dev/null +++ b/common/changes/@yuants/agent/2024-02-18-05-31.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/agent", + "comment": "multi-datasource", + "type": "minor" + } + ], + "packageName": "@yuants/agent" +} \ No newline at end of file diff --git a/common/changes/@yuants/kernel/2024-02-18-05-31.json b/common/changes/@yuants/kernel/2024-02-18-05-31.json new file mode 100644 index 000000000..c9ef92a0e --- /dev/null +++ b/common/changes/@yuants/kernel/2024-02-18-05-31.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/kernel", + "comment": "multi-datasource", + "type": "minor" + } + ], + "packageName": "@yuants/kernel" +} \ No newline at end of file diff --git a/libraries/agent/etc/agent.api.md b/libraries/agent/etc/agent.api.md index bb9d23968..d46dabf43 100644 --- a/libraries/agent/etc/agent.api.md +++ b/libraries/agent/etc/agent.api.md @@ -130,7 +130,7 @@ export const useEffect: (fn: () => void | (() => void), deps?: any[]) => void; // @public export const useExchange: () => { - getQuote: (product_id: string) => { + getQuote: (datasource_id: string, product_id: string) => { ask: number; bid: number; }; diff --git a/libraries/agent/src/hooks/useExchange.ts b/libraries/agent/src/hooks/useExchange.ts index ddd95b5ca..a4a078e3d 100644 --- a/libraries/agent/src/hooks/useExchange.ts +++ b/libraries/agent/src/hooks/useExchange.ts @@ -6,7 +6,7 @@ import { useAgent } from '.'; * @public */ export const useExchange = (): { - getQuote: (product_id: string) => { ask: number; bid: number }; + getQuote: (datasource_id: string, product_id: string) => { ask: number; bid: number }; getOrderById: (orderId: string) => IOrder | undefined; listOrders: () => IOrder[]; submitOrder: (...orders: IOrder[]) => void; @@ -14,8 +14,8 @@ export const useExchange = (): { } => { const agent = useAgent(); return { - getQuote: (product_id: string) => - agent.orderMatchingUnit.quoteDataUnit.mapProductIdToQuote[product_id] || { ask: NaN, bid: NaN }, + getQuote: (datasource_id: string, product_id: string) => + agent.orderMatchingUnit.quoteDataUnit.getQuote(datasource_id, product_id) || { ask: NaN, bid: NaN }, getOrderById: (orderId: string) => agent.orderMatchingUnit.getOrderById(orderId), listOrders: () => { diff --git a/libraries/agent/src/hooks/useProduct.ts b/libraries/agent/src/hooks/useProduct.ts index a27aa4510..8ad73e7ee 100644 --- a/libraries/agent/src/hooks/useProduct.ts +++ b/libraries/agent/src/hooks/useProduct.ts @@ -16,7 +16,7 @@ export const useProduct = (datasource_id: string, product_id: string): IProduct }); }, []); return ( - agent.productDataUnit.mapProductIdToProduct[product_id] || { + agent.productDataUnit.getProduct(datasource_id, product_id) || { datasource_id, product_id, } diff --git a/libraries/agent/src/hooks/useTick.ts b/libraries/agent/src/hooks/useTick.ts index a6eb097b2..3d086ef28 100644 --- a/libraries/agent/src/hooks/useTick.ts +++ b/libraries/agent/src/hooks/useTick.ts @@ -12,10 +12,7 @@ export const useTick = (account_id: string, datasource_id: string, product_id: s useProduct(datasource_id, product_id); useEffect(() => { - const unit = agent.kernel.units.find( - (unit): unit is RealtimeTickLoadingUnit => unit instanceof RealtimeTickLoadingUnit, - ); - unit?.addTickTask(datasource_id, product_id, account_id); + agent.kernel.findUnit(RealtimeTickLoadingUnit)?.addTickTask(datasource_id, product_id, account_id); }, []); return agent.tickDataUnit.getTick(account_id, product_id); diff --git a/libraries/kernel/etc/kernel.api.md b/libraries/kernel/etc/kernel.api.md index 5bdf17e4c..c419e35c2 100644 --- a/libraries/kernel/etc/kernel.api.md +++ b/libraries/kernel/etc/kernel.api.md @@ -15,6 +15,20 @@ import { PositionVariant } from '@yuants/protocol'; import { Subject } from 'rxjs'; import { Terminal } from '@yuants/protocol'; +// @public (undocumented) +export class AccountDatasourceRelationUnit extends BasicUnit { + // (undocumented) + dump(): any; + // Warning: (ae-forgotten-export) The symbol "IAccountDatasourceRelation" needs to be exported by the entry point index.d.ts + // + // (undocumented) + list(): IAccountDatasourceRelation[]; + // (undocumented) + restore(state: any): void; + // (undocumented) + updateRelation(relation: IAccountDatasourceRelation): void; +} + // @public (undocumented) export class AccountInfoUnit extends BasicUnit { constructor(kernel: Kernel, productDataUnit: ProductDataUnit, quoteDataUnit: QuoteDataUnit, historyOrderUnit: HistoryOrderUnit); @@ -411,6 +425,10 @@ export class Kernel { units: any[]; }; // (undocumented) + findUnit(Unit: new (...args: any[]) => T): T | undefined; + // (undocumented) + findUnits(Unit: new (...args: any[]) => T): T[]; + // (undocumented) id: string; log: ((...params: any[]) => void) | undefined; // (undocumented) @@ -607,12 +625,18 @@ export class PortfolioSimulatorUnit extends BasicUnit { export class ProductDataUnit extends BasicUnit { // (undocumented) dump(): { - mapProductIdToProduct: Record; + mapProductIdToProduct: Record>; }; // (undocumented) - mapProductIdToProduct: Record; + getProduct(datasource_id: string, product_id: string): IProduct | undefined; + // (undocumented) + listProducts(): IProduct[]; + // (undocumented) + onInit(): void | Promise; // (undocumented) restore(state: any): void; + // (undocumented) + updateProduct(product: IProduct): void; } // @public @@ -648,18 +672,21 @@ export class ProductLoadingUnit extends BasicUnit { export class QuoteDataUnit extends BasicUnit { // (undocumented) dump(): { - mapProductIdToQuote: Record; + mapProductIdToQuote: Record>; + mapDatasourceIdMapProductIdToAccountIds: Record>>; }; + // Warning: (ae-forgotten-export) The symbol "IQuote" needs to be exported by the entry point index.d.ts + // // (undocumented) - mapProductIdToQuote: Record; + getQuote(datasource_id: string, product_id: string): IQuote | undefined; + // (undocumented) + listQuotes(): IQuote[]; + // (undocumented) + onInit(): void | Promise; // (undocumented) restore(state: any): void; + // (undocumented) + updateQuote(datasource_id: string, product_id: string, ask: number, bid: number): void; } // @public (undocumented) diff --git a/libraries/kernel/src/kernel.ts b/libraries/kernel/src/kernel.ts index 5bed6f78f..c8c2cd0c8 100644 --- a/libraries/kernel/src/kernel.ts +++ b/libraries/kernel/src/kernel.ts @@ -211,4 +211,12 @@ export class Kernel { unit.restore(state.units[idx]); }); }; + + findUnit(Unit: new (...args: any[]) => T): T | undefined { + return this.units.find((v): v is T => v instanceof Unit); + } + + findUnits(Unit: new (...args: any[]) => T) { + return this.units.filter((v): v is T => v instanceof Unit); + } } diff --git a/libraries/kernel/src/scenes/AccountReplayScene.ts b/libraries/kernel/src/scenes/AccountReplayScene.ts index e163d2be1..fc224bbca 100644 --- a/libraries/kernel/src/scenes/AccountReplayScene.ts +++ b/libraries/kernel/src/scenes/AccountReplayScene.ts @@ -53,7 +53,7 @@ export const AccountReplayScene = ( const productLoadingUnit = new ProductLoadingUnit(kernel, terminal, productDataUnit); // Adhoc Unit: 根据品种加载交叉盘品种 new BasicUnit(kernel).onInit = async () => { - for (const product of Object.values(productDataUnit.mapProductIdToProduct)) { + for (const product of productDataUnit.listProducts()) { if (product.quote_currency && currency && product.quote_currency !== currency) { const [productA] = await lastValueFrom( terminal @@ -71,7 +71,7 @@ export const AccountReplayScene = ( ), ); if (productA) { - productDataUnit.mapProductIdToProduct[productA.product_id] = productA; + productDataUnit.updateProduct(productA); } const [productB] = await lastValueFrom( terminal @@ -89,14 +89,14 @@ export const AccountReplayScene = ( ), ); if (productB) { - productDataUnit.mapProductIdToProduct[productB.product_id] = productB; + productDataUnit.updateProduct(productB); } } } }; // Adhoc Unit: 根据品种加载行情数据 new BasicUnit(kernel).onInit = () => { - for (const product of Object.values(productDataUnit.mapProductIdToProduct)) { + for (const product of productDataUnit.listProducts()) { periodLoadingUnit.periodTasks.push({ datasource_id: datasource_id ?? account_id, product_id: product.product_id, diff --git a/libraries/kernel/src/scenes/OrderMergeReplayScene.ts b/libraries/kernel/src/scenes/OrderMergeReplayScene.ts index 17c83919d..af7bfed54 100644 --- a/libraries/kernel/src/scenes/OrderMergeReplayScene.ts +++ b/libraries/kernel/src/scenes/OrderMergeReplayScene.ts @@ -28,7 +28,7 @@ export const OrderMergeReplayScene = ( const productDataUnit = new ProductDataUnit(kernel); { for (const product of products) { - productDataUnit.mapProductIdToProduct[product.product_id] = product; + productDataUnit.updateProduct(product); } } const periodDataUnit = new PeriodDataUnit(kernel, quoteDataUnit); diff --git a/libraries/kernel/src/units/AccountDatasouceRelationUnit.ts b/libraries/kernel/src/units/AccountDatasouceRelationUnit.ts new file mode 100644 index 000000000..6da518da6 --- /dev/null +++ b/libraries/kernel/src/units/AccountDatasouceRelationUnit.ts @@ -0,0 +1,33 @@ +import { encodePath } from '@yuants/data-model'; +import { BasicUnit } from './BasicUnit'; + +interface IAccountDatasourceRelation { + account_id: string; + datasource_id: string; + product_id: string; +} + +/** + * @public + */ +export class AccountDatasourceRelationUnit extends BasicUnit { + private map: Record = {}; + + list(): IAccountDatasourceRelation[] { + return Object.values(this.map); + } + + updateRelation(relation: IAccountDatasourceRelation) { + this.map[encodePath(relation.account_id, relation.datasource_id, relation.product_id)] = relation; + } + + restore(state: any): void { + this.map = state.map; + } + + dump(): any { + return { + map: this.map, + }; + } +} diff --git a/libraries/kernel/src/units/AccountInfoUnit.ts b/libraries/kernel/src/units/AccountInfoUnit.ts index 1602bd190..c451fa724 100644 --- a/libraries/kernel/src/units/AccountInfoUnit.ts +++ b/libraries/kernel/src/units/AccountInfoUnit.ts @@ -89,7 +89,7 @@ export class AccountInfoUnit extends BasicUnit { (this.mapAccountIdToBalance[accountId] || 0) + order.profit_correction; } - const theProduct = this.productDataUnit.mapProductIdToProduct[order.product_id]!; + const theProduct = this.productDataUnit.getProduct(order.account_id, order.product_id)!; // 假设所有的 order 都有 position_id const variant = @@ -141,7 +141,7 @@ export class AccountInfoUnit extends BasicUnit { tradedVolume, thePosition.variant, theAccountInfo.money.currency, - (product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id], + (product_id) => this.quoteDataUnit.getQuote(accountId, product_id), ); } } @@ -152,9 +152,9 @@ export class AccountInfoUnit extends BasicUnit { .filter((pos) => pos.volume > 0) // 过滤掉空的头寸 .map((position): IPosition => { const product_id = position.product_id; - const quote = this.quoteDataUnit.mapProductIdToQuote[product_id]; - const product = this.productDataUnit.mapProductIdToProduct[product_id]!; - if (quote) { + const quote = this.quoteDataUnit.getQuote(accountId, product_id); + const product = this.productDataUnit.getProduct(accountId, product_id); + if (product && quote) { const closable_price = position.variant === PositionVariant.LONG ? quote.bid : quote.ask; const floating_profit = getProfit( product, @@ -163,7 +163,7 @@ export class AccountInfoUnit extends BasicUnit { position.volume, position.variant, theAccountInfo.money.currency, - (product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id], + (product_id) => this.quoteDataUnit.getQuote(accountId, product_id), ); const nextPosition = { ...position, @@ -177,7 +177,7 @@ export class AccountInfoUnit extends BasicUnit { }); // 维护账户保证金 const used = positions.reduce((acc, cur) => { - const product = this.productDataUnit.mapProductIdToProduct[cur.product_id]; + const product = this.productDataUnit.getProduct(accountId, cur.product_id); if (!product) { return acc; } @@ -189,7 +189,7 @@ export class AccountInfoUnit extends BasicUnit { cur.volume, cur.variant, theAccountInfo.money.currency, - (product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id], + (product_id) => this.quoteDataUnit.getQuote(accountId, product_id), ) / (theAccountInfo.money.leverage ?? 1) ); diff --git a/libraries/kernel/src/units/AccountSimulatorUnit.ts b/libraries/kernel/src/units/AccountSimulatorUnit.ts index c4492f878..e509f80b8 100644 --- a/libraries/kernel/src/units/AccountSimulatorUnit.ts +++ b/libraries/kernel/src/units/AccountSimulatorUnit.ts @@ -63,7 +63,7 @@ export class AccountSimulatorUnit extends BasicUnit { balance += order.profit_correction; } - const theProduct = this.productDataUnit.mapProductIdToProduct[order.product_id]!; + const theProduct = this.productDataUnit.getProduct(order.account_id, order.product_id)!; // 假设所有的 order 都有 position_id const variant = @@ -76,7 +76,7 @@ export class AccountSimulatorUnit extends BasicUnit { thePosition.free_volume = thePosition.volume = order.volume; } else { // 开仓的时候,如果有头寸,就要更新头寸 - const nextVolume = roundToStep(thePosition.volume + order.volume, theProduct.volume_step ?? 1); + const nextVolume = roundToStep(thePosition.volume + order.volume, theProduct?.volume_step ?? 1); const nextPositionPrice = (thePosition.position_price * thePosition.volume + order.traded_price! * order.volume) / nextVolume; @@ -96,9 +96,9 @@ export class AccountSimulatorUnit extends BasicUnit { // 平仓的时候,如果有头寸,就要更新头寸 const tradedVolume = roundToStep( Math.min(order.volume, thePosition.volume), - theProduct.volume_step ?? 1, + theProduct?.volume_step ?? 1, ); - const nextVolume = roundToStep(thePosition.volume - tradedVolume, theProduct.volume_step ?? 1); + const nextVolume = roundToStep(thePosition.volume - tradedVolume, theProduct?.volume_step ?? 1); // 如果头寸已经平仓完了,就删除头寸 if (nextVolume === 0) { delete this.mapPositionIdToPosition[order.position_id!]; @@ -113,7 +113,7 @@ export class AccountSimulatorUnit extends BasicUnit { tradedVolume, thePosition.variant, this.accountInfo.money.currency, - (product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id], + (product_id) => this.quoteDataUnit.getQuote(this.accountInfo.account_id, product_id), ); } } @@ -125,9 +125,9 @@ export class AccountSimulatorUnit extends BasicUnit { .filter((pos) => pos.volume > 0) // 过滤掉空的头寸 .map((position): IPosition => { const product_id = position.product_id; - const quote = this.quoteDataUnit.mapProductIdToQuote[product_id]; - const product = this.productDataUnit.mapProductIdToProduct[product_id]!; - if (quote) { + const quote = this.quoteDataUnit.getQuote(this.accountInfo.account_id, product_id); + const product = this.productDataUnit.getProduct(this.accountInfo.account_id, product_id); + if (product && quote) { const closable_price = position.variant === PositionVariant.LONG ? quote.bid : quote.ask; const floating_profit = getProfit( product, @@ -136,7 +136,7 @@ export class AccountSimulatorUnit extends BasicUnit { position.volume, position.variant, this.accountInfo.money.currency, - (product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id], + (product_id) => this.quoteDataUnit.getQuote(this.accountInfo.account_id, product_id), ); const nextPosition = { ...position, @@ -150,7 +150,7 @@ export class AccountSimulatorUnit extends BasicUnit { }); // 维护账户保证金 const used = positions.reduce((acc, cur) => { - const product = this.productDataUnit.mapProductIdToProduct[cur.product_id]; + const product = this.productDataUnit.getProduct(this.accountInfo.account_id, cur.product_id); if (!product) { return acc; } @@ -162,7 +162,7 @@ export class AccountSimulatorUnit extends BasicUnit { cur.volume, cur.variant, this.accountInfo.money.currency, - (product_id) => this.quoteDataUnit.mapProductIdToQuote[product_id], + (product_id) => this.quoteDataUnit.getQuote(this.accountInfo.account_id, product_id), ) / (this.accountInfo.money.leverage ?? 1) ); diff --git a/libraries/kernel/src/units/ActivePortfolioOptimizeSimulatorUnit.ts b/libraries/kernel/src/units/ActivePortfolioOptimizeSimulatorUnit.ts index 188c46afd..11d8be1fe 100644 --- a/libraries/kernel/src/units/ActivePortfolioOptimizeSimulatorUnit.ts +++ b/libraries/kernel/src/units/ActivePortfolioOptimizeSimulatorUnit.ts @@ -231,10 +231,12 @@ export class ActivePortfolioOptimizeSimulatorUnit extends BasicUnit { const positionDiffs = diffPosition(sourcePositions, targetPositions); // 根据仓位差异下单 for (const positionDiff of positionDiffs) { + const account_id = this.targetAccountInfoUnit.accountInfo.account_id; + const product_id = positionDiff.product_id; const order: IOrder = { client_order_id: UUID(), - account_id: this.targetAccountInfoUnit.accountInfo.account_id, - product_id: positionDiff.product_id, + account_id: account_id, + product_id: product_id, position_id: positionDiff.variant === PositionVariant.LONG ? `${positionDiff.product_id}-LONG` @@ -250,7 +252,7 @@ export class ActivePortfolioOptimizeSimulatorUnit extends BasicUnit { : OrderDirection.CLOSE_SHORT, volume: roundToStep( Math.abs(positionDiff.error_volume), - this.productDataUnit.mapProductIdToProduct[positionDiff.product_id].volume_step ?? 1, + this.productDataUnit.getProduct(account_id, product_id)?.volume_step ?? 1, ), }; if (order.volume > 0) { diff --git a/libraries/kernel/src/units/HistoryPeriodLoadingUnit.ts b/libraries/kernel/src/units/HistoryPeriodLoadingUnit.ts index dd0ded94d..3c95b498f 100644 --- a/libraries/kernel/src/units/HistoryPeriodLoadingUnit.ts +++ b/libraries/kernel/src/units/HistoryPeriodLoadingUnit.ts @@ -37,7 +37,7 @@ export class HistoryPeriodLoadingUnit extends BasicUnit { task.period_in_sec }" [${formatTime(task.start_time_in_us / 1000)}, ${formatTime(task.end_time_in_us / 1000)})`, ); - const theProduct = this.productDataUnit.mapProductIdToProduct[product_id]; + const theProduct = this.productDataUnit.getProduct(datasource_id, product_id); await lastValueFrom( this.terminal.queryPeriods(task).pipe( tap((x) => { @@ -53,7 +53,7 @@ export class HistoryPeriodLoadingUnit extends BasicUnit { map((periods) => { periods.sort((a, b) => a.timestamp_in_us - b.timestamp_in_us); periods.forEach((period, idx) => { - const spread = period.spread || theProduct.spread || 0; + const spread = period.spread || theProduct?.spread || 0; // 推入 Period 数据 // ISSUE: 将开盘时的K线也推入队列,产生一个模拟的事件,可以提早确认上一根K线的收盘 const openEventId = this.kernel.alloc(period.timestamp_in_us / 1000); diff --git a/libraries/kernel/src/units/OrderMatchingUnit.ts b/libraries/kernel/src/units/OrderMatchingUnit.ts index 0450cbdb3..1efdc26e9 100644 --- a/libraries/kernel/src/units/OrderMatchingUnit.ts +++ b/libraries/kernel/src/units/OrderMatchingUnit.ts @@ -215,8 +215,8 @@ export class OrderMatchingUnit extends BasicUnit { continue; } isSomeOrderTraded = true; - const theProduct = this.productDataUnit.mapProductIdToProduct[order.product_id]; - const volume_step = theProduct.volume_step ?? 1; + const theProduct = this.productDataUnit.getProduct(order.account_id, order.product_id); + const volume_step = theProduct?.volume_step ?? 1; const volume = roundToStep(order.volume, volume_step); const theOrder = { ...order, diff --git a/libraries/kernel/src/units/PeriodDataUnit.ts b/libraries/kernel/src/units/PeriodDataUnit.ts index 3f6eb42f8..b75a9a9d2 100644 --- a/libraries/kernel/src/units/PeriodDataUnit.ts +++ b/libraries/kernel/src/units/PeriodDataUnit.ts @@ -33,10 +33,12 @@ export class PeriodDataUnit extends BasicUnit { // Update Period list[updateIdx] = period; // Copy to QuoteDataUnit - this.quoteDataUnit.mapProductIdToQuote[period.product_id] = { - bid: period.close, - ask: period.close + (period.spread || 0), - }; + this.quoteDataUnit.updateQuote( + period.datasource_id, + period.product_id, + period.close + (period.spread || 0), + period.close, + ); this._periodUpdated$.next(period); } diff --git a/libraries/kernel/src/units/PortfolioSimulatorUnit.ts b/libraries/kernel/src/units/PortfolioSimulatorUnit.ts index 25ad0c4ad..9131ba88b 100644 --- a/libraries/kernel/src/units/PortfolioSimulatorUnit.ts +++ b/libraries/kernel/src/units/PortfolioSimulatorUnit.ts @@ -172,11 +172,13 @@ export class PortfolioSimulatorUnit extends BasicUnit { const positionDiffs = diffPosition(sourcePositions, targetPositions); // 根据仓位差异下单 for (const positionDiff of positionDiffs) { + const account_id = this.targetAccountInfoUnit.accountInfo.account_id; + const product_id = positionDiff.product_id; // 逻辑有问题 const order: IOrder = { client_order_id: UUID(), - account_id: this.targetAccountInfoUnit.accountInfo.account_id, - product_id: positionDiff.product_id, + account_id: account_id, + product_id: product_id, position_id: positionDiff.variant === PositionVariant.LONG ? `${positionDiff.product_id}-LONG` @@ -192,7 +194,7 @@ export class PortfolioSimulatorUnit extends BasicUnit { : OrderDirection.CLOSE_SHORT, volume: roundToStep( Math.abs(positionDiff.error_volume), - this.productDataUnit.mapProductIdToProduct[positionDiff.product_id].volume_step ?? 1, + this.productDataUnit.getProduct(account_id, product_id)?.volume_step ?? 1, ), }; if (order.volume > 0) { diff --git a/libraries/kernel/src/units/ProductDataUnit.ts b/libraries/kernel/src/units/ProductDataUnit.ts index 372879606..20975baca 100644 --- a/libraries/kernel/src/units/ProductDataUnit.ts +++ b/libraries/kernel/src/units/ProductDataUnit.ts @@ -1,4 +1,5 @@ import { IProduct } from '@yuants/protocol'; +import { AccountDatasourceRelationUnit } from './AccountDatasouceRelationUnit'; import { BasicUnit } from './BasicUnit'; /** @@ -6,7 +7,37 @@ import { BasicUnit } from './BasicUnit'; * @public */ export class ProductDataUnit extends BasicUnit { - mapProductIdToProduct: Record = {}; + private mapProductIdToProduct: Record> = {}; + private adrUnit: AccountDatasourceRelationUnit | undefined; + + onInit(): void | Promise { + this.adrUnit = this.kernel.findUnit(AccountDatasourceRelationUnit); + } + + listProducts(): IProduct[] { + return Object.values(this.mapProductIdToProduct).flatMap((x) => Object.values(x)); + } + + getProduct(datasource_id: string, product_id: string): IProduct | undefined { + return ( + this.mapProductIdToProduct[datasource_id]?.[product_id] ?? this.mapProductIdToProduct['']?.[product_id] + ); + } + + updateProduct(product: IProduct) { + // Set default + (this.mapProductIdToProduct[''] ??= {})[product.product_id] = product; + // Set datasource_id + (this.mapProductIdToProduct[product.datasource_id] ??= {})[product.product_id] = product; + // Copy to All Related Accounts + if (this.adrUnit) { + for (const relation of this.adrUnit.list()) { + if (relation.datasource_id === product.datasource_id && relation.product_id === product.product_id) { + (this.mapProductIdToProduct[relation.account_id] ??= {})[product.product_id] = product; + } + } + } + } dump() { return { diff --git a/libraries/kernel/src/units/ProductLoadingUnit.ts b/libraries/kernel/src/units/ProductLoadingUnit.ts index 665b20399..01e5ea0fa 100644 --- a/libraries/kernel/src/units/ProductLoadingUnit.ts +++ b/libraries/kernel/src/units/ProductLoadingUnit.ts @@ -44,7 +44,7 @@ export class ProductLoadingUnit extends BasicUnit { }), tap((product) => { this.kernel.log?.(formatTime(Date.now()), 'product loaded', JSON.stringify(product)); - this.productDataUnit.mapProductIdToProduct[product.product_id] = product; + this.productDataUnit.updateProduct(product); }), ), ); diff --git a/libraries/kernel/src/units/QuoteDataUnit.ts b/libraries/kernel/src/units/QuoteDataUnit.ts index ae1afe5d2..4ec008689 100644 --- a/libraries/kernel/src/units/QuoteDataUnit.ts +++ b/libraries/kernel/src/units/QuoteDataUnit.ts @@ -1,17 +1,90 @@ +import { AccountDatasourceRelationUnit } from './AccountDatasouceRelationUnit'; import { BasicUnit } from './BasicUnit'; +import { ProductDataUnit } from './ProductDataUnit'; + +interface IQuote { + datasource_id: string; + product_id: string; + ask: number; + bid: number; +} /** * 报价数据单元 * @public */ export class QuoteDataUnit extends BasicUnit { - mapProductIdToQuote: Record = {}; + private adrUnit: AccountDatasourceRelationUnit | undefined; + private productDataUnit: ProductDataUnit | undefined; + private mapProductIdToQuote: Record> = {}; + private mapDatasourceIdMapProductIdToAccountIds: Record>> = {}; + + onInit(): void | Promise { + this.adrUnit = this.kernel.findUnit(AccountDatasourceRelationUnit); + this.productDataUnit = this.kernel.findUnit(ProductDataUnit); + if (this.adrUnit) { + for (const relation of this.adrUnit.list()) { + ((this.mapDatasourceIdMapProductIdToAccountIds[relation.datasource_id] ??= {})[ + relation.product_id + ] ??= new Set()).add(relation.account_id); + } + } + } + + getQuote(datasource_id: string, product_id: string): IQuote | undefined { + return ( + this.mapProductIdToQuote[datasource_id]?.[product_id] ?? this.mapProductIdToQuote['']?.[product_id] + ); + } + + private _updateQuote(quote: IQuote) { + (this.mapProductIdToQuote[quote.datasource_id] ??= {})[quote.product_id] = quote; + (this.mapProductIdToQuote[''] ??= {})[quote.product_id] = quote; + this.mapDatasourceIdMapProductIdToAccountIds[quote.datasource_id]?.[quote.product_id]?.forEach( + (account_id) => { + (this.mapProductIdToQuote[account_id] ??= {})[quote.product_id] = quote; + }, + ); + } + + updateQuote(datasource_id: string, product_id: string, ask: number, bid: number) { + this._updateQuote({ + datasource_id, + product_id, + ask, + bid, + }); + if (this.productDataUnit) { + const theProduct = this.productDataUnit.getProduct(datasource_id, product_id); + if (theProduct?.base_currency && theProduct.quote_currency) { + this._updateQuote({ + datasource_id, + product_id: `${theProduct.base_currency}${theProduct.quote_currency}`, + ask, + bid, + }); + this._updateQuote({ + datasource_id, + product_id: `${theProduct.quote_currency}${theProduct.base_currency}`, + ask: 1 / bid, + bid: 1 / ask, + }); + } + } + } + + listQuotes(): IQuote[] { + return Object.values(this.mapProductIdToQuote).flatMap((x) => Object.values(x)); + } + dump() { return { mapProductIdToQuote: this.mapProductIdToQuote, + mapDatasourceIdMapProductIdToAccountIds: this.mapDatasourceIdMapProductIdToAccountIds, }; } restore(state: any): void { this.mapProductIdToQuote = state.mapProductIdToQuote; + this.mapDatasourceIdMapProductIdToAccountIds = state.mapDatasourceIdMapProductIdToAccountIds; } } diff --git a/libraries/kernel/src/units/RealtimePeriodLoadingUnit.ts b/libraries/kernel/src/units/RealtimePeriodLoadingUnit.ts index 889328145..90b1edf4f 100644 --- a/libraries/kernel/src/units/RealtimePeriodLoadingUnit.ts +++ b/libraries/kernel/src/units/RealtimePeriodLoadingUnit.ts @@ -133,7 +133,7 @@ export class RealtimePeriodLoadingUnit extends BasicUnit { // 配置行情查询任务 for (const task of this.periodTasks) { const { datasource_id, product_id, period_in_sec } = task; - const theProduct = this.productDataUnit.mapProductIdToProduct[product_id]; + const theProduct = this.productDataUnit.getProduct(datasource_id, product_id); const channelId = encodePath('Period', datasource_id, product_id, period_in_sec); // ISSUE: Period[].length >= 2 to ensure overlay @@ -150,7 +150,7 @@ export class RealtimePeriodLoadingUnit extends BasicUnit { const eventId = this.kernel.alloc(Date.now()); this.mapEventIdToPeriod.set( eventId, - periods.map((period) => ({ ...period, spread: period.spread || theProduct.spread || 0 })), + periods.map((period) => ({ ...period, spread: period.spread || theProduct?.spread || 0 })), ); }), ); diff --git a/libraries/kernel/src/units/RealtimeTickLoadingUnit.ts b/libraries/kernel/src/units/RealtimeTickLoadingUnit.ts index 89be56963..2bbaba07b 100644 --- a/libraries/kernel/src/units/RealtimeTickLoadingUnit.ts +++ b/libraries/kernel/src/units/RealtimeTickLoadingUnit.ts @@ -5,6 +5,7 @@ import { Kernel } from '../kernel'; import { BasicUnit } from './BasicUnit'; import { QuoteDataUnit } from './QuoteDataUnit'; import { TickDataUnit } from './TickDataUnit'; +import { AccountDatasourceRelationUnit } from './AccountDatasouceRelationUnit'; /** * Realtime Tick @@ -23,6 +24,9 @@ export class RealtimeTickLoadingUnit extends BasicUnit { private mapEventIdToTick = new Map(); addTickTask(datasource_id: string, product_id: string, account_id: string = '') { + this.kernel + .findUnit(AccountDatasourceRelationUnit) + ?.updateRelation({ datasource_id, product_id, account_id }); this._tickTasks.add(encodePath(datasource_id, product_id, account_id)); } @@ -31,10 +35,7 @@ export class RealtimeTickLoadingUnit extends BasicUnit { onEvent(): void | Promise { const tick = this.mapEventIdToTick.get(this.kernel.currentEventId); if (tick && tick.ask && tick.bid) { - this.quoteDataUnit.mapProductIdToQuote[tick.product_id] = { - ask: tick.ask, - bid: tick.bid, - }; + this.quoteDataUnit.updateQuote(tick.datasource_id, tick.product_id, tick.ask, tick.bid); this.tickDataUnit.setTick(tick); diff --git a/libraries/kernel/src/units/index.ts b/libraries/kernel/src/units/index.ts index 35a3198b0..7cf759019 100644 --- a/libraries/kernel/src/units/index.ts +++ b/libraries/kernel/src/units/index.ts @@ -1,3 +1,4 @@ +export * from './AccountDatasouceRelationUnit'; export * from './AccountInfoUnit'; export * from './AccountPerformanceHubUnit'; export * from './AccountPerformanceUnit'; diff --git a/libraries/kernel/src/units/metrics/QuoteMetricsUnit.ts b/libraries/kernel/src/units/metrics/QuoteMetricsUnit.ts index 83c1abb08..06c3c2561 100644 --- a/libraries/kernel/src/units/metrics/QuoteMetricsUnit.ts +++ b/libraries/kernel/src/units/metrics/QuoteMetricsUnit.ts @@ -17,15 +17,17 @@ export class QuoteMetricsUnit extends BasicUnit { } onEvent(): void | Promise { - for (const [product_id, quote] of Object.entries(this.quoteDataUnit.mapProductIdToQuote)) { + for (const quote of this.quoteDataUnit.listQuotes()) { MetricQuoteDataUnitQuotes.set(quote.ask, { kernel_id: this.kernel.id, - product_id: product_id, + datasource_id: quote.datasource_id, + product_id: quote.product_id, side: 'ask', }); MetricQuoteDataUnitQuotes.set(quote.bid, { kernel_id: this.kernel.id, - product_id: product_id, + datasource_id: quote.datasource_id, + product_id: quote.product_id, side: 'bid', }); } diff --git a/ui/web/src/modules/StaticFileServerStorage/StaticFileServerPeriodLoadingUnit.ts b/ui/web/src/modules/StaticFileServerStorage/StaticFileServerPeriodLoadingUnit.ts index 47f050aa4..922ffc1a3 100644 --- a/ui/web/src/modules/StaticFileServerStorage/StaticFileServerPeriodLoadingUnit.ts +++ b/ui/web/src/modules/StaticFileServerStorage/StaticFileServerPeriodLoadingUnit.ts @@ -68,19 +68,19 @@ export class StaticFileServerPeriodLoadingUnit extends BasicUnit { .map((v: IProduct) => [v.product_id, v]), ); for (const task of this.periodTasks) { - if (!this.productDataUnit.mapProductIdToProduct[task.product_id]) { - this.productDataUnit.mapProductIdToProduct[task.product_id] = mapProductIdToProduct[ - task.product_id - ] || { - datasource_id: task.datasource_id, - product_id: task.product_id, - }; + if (!this.productDataUnit.getProduct(task.datasource_id, task.product_id)) { + this.productDataUnit.updateProduct( + mapProductIdToProduct[task.product_id] || { + datasource_id: task.datasource_id, + product_id: task.product_id, + }, + ); } } for (const task of this.periodTasks) { const dirPath = `OHLC/${task.product_id}/${mapPeriodToDuration[task.period_in_sec]}`; const files = storageIndex.filter((path) => path.startsWith(dirPath)); - const theProduct = this.productDataUnit.mapProductIdToProduct[task.product_id]; + const theProduct = this.productDataUnit.getProduct(task.datasource_id, task.product_id); if (files.length === 0) { this.kernel.log?.( `${formatTime(Date.now())} 未找到 "${task.product_id}" / "${task.period_in_sec}" 的历史数据`, @@ -125,7 +125,7 @@ export class StaticFileServerPeriodLoadingUnit extends BasicUnit { map((periods) => { periods.sort((a, b) => a.timestamp_in_us - b.timestamp_in_us); periods.forEach((period, idx) => { - const spread = period.spread || theProduct.spread || 0; + const spread = period.spread || theProduct?.spread || 0; // Push Period Data // ISSUE: Push the K-line at the opening time into the queue, which generates a simulated event, // which can confirm the closing of the previous K-line early