Skip to content

Commit

Permalink
feat(vendor-ccxt): add provide tick for ccxt (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
Thrimbda authored Feb 20, 2024
1 parent e5f200c commit d684294
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 24 deletions.
13 changes: 9 additions & 4 deletions apps/vendor-ccxt/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@
"name": "@yuants/vendor-ccxt",
"version": "0.0.7",
"files": [
"dist",
"lib",
"temp"
"dist/extension.bundle.js"
],
"scripts": {
"build": "heft test --clean && api-extractor run --local && yuan-toolkit post-build"
},
"dependencies": {
"@yuants/protocol": "workspace:*",
"rxjs": "~7.5.6",
"ccxt": "~4.0.107",
"ccxt": "~4.2.48",
"@yuants/data-model": "workspace:*"
},
"devDependencies": {
Expand All @@ -29,5 +27,12 @@
"publishConfig": {
"registry": "https://registry.npmjs.org",
"access": "public"
},
"io_ntnl": {
"deploy_files": [
"dist",
"lib",
"temp"
]
}
}
86 changes: 72 additions & 14 deletions apps/vendor-ccxt/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
IPeriod,
IPosition,
IProduct,
ITick,
OrderDirection,
OrderType,
PositionVariant,
Expand All @@ -12,13 +13,16 @@ import {
import { Terminal } from '@yuants/protocol';
import '@yuants/protocol/lib/services';
import '@yuants/protocol/lib/services/order';
import ccxt, { Exchange } from 'ccxt';
import ccxt, { Exchange, Market } from 'ccxt';
import {
EMPTY,
bufferCount,
catchError,
combineLatestWith,
defer,
delayWhen,
expand,
filter,
forkJoin,
from,
lastValueFrom,
Expand Down Expand Up @@ -68,7 +72,7 @@ import {
mergeMap((positions) => positions),
map((position): IPosition => {
return {
position_id: position.id,
position_id: position.id!,
product_id: mapSymbolToProductId[position.symbol],
variant: position.side === 'long' ? PositionVariant.LONG : PositionVariant.SHORT,
volume: position.contracts || 0,
Expand Down Expand Up @@ -130,6 +134,7 @@ import {

const products$ = defer(() => ex.loadMarkets()).pipe(
mergeMap((markets) => Object.values(markets)),
filter((market): market is Exclude<typeof market, undefined> => !!market),
tap((market) => {
console.info('Product-Symbol', market.id, market.symbol);
mapProductIdToSymbol[market.id] = market.symbol;
Expand Down Expand Up @@ -262,12 +267,13 @@ import {
datasource_id: EXCHANGE_ID,
product_id,
period_in_sec,
timestamp_in_us: t * 1000,
open: o,
high: h,
low: l,
close: c,
volume: vol,
start_at: t,
timestamp_in_us: t! * 1000,
open: o!,
high: h!,
low: l!,
close: c!,
volume: vol!,
}),
),
toArray(),
Expand All @@ -289,6 +295,58 @@ import {
},
);

const memoize = <T extends (...args: any[]) => any>(fn: T): T => {
const cache = new Map<string, ReturnType<T>>();
return ((...args: any[]) => {
const key = JSON.stringify(args);
if (cache.has(key)) {
return cache.get(key);
}
const result = fn(...args);
cache.set(key, result);
return result;
}) as T;
};

const useFundingRate = memoize((symbol: string) => {
return from(ex.fetchFundingRate(symbol)).pipe(
repeat({ delay: 10_000 }),
retry({ delay: 5000 }),
shareReplay(1),
);
});

terminal.provideTicks(EXCHANGE_ID, (product_id) => {
console.info(formatTime(Date.now()), 'tick_stream', product_id);
const symbol = mapProductIdToSymbol[product_id];
if (!symbol) {
console.info(formatTime(Date.now()), 'tick_stream', product_id, 'no such symbol');
return EMPTY;
}
return defer(() => ex.fetchTicker(symbol)).pipe(
combineLatestWith(useFundingRate(symbol)),
map(([ticker, fundingRateObj]): ITick => {
return {
datasource_id: EXCHANGE_ID,
product_id,
updated_at: ticker.timestamp!,
ask: ticker.ask,
bid: ticker.bid,
volume: ticker.baseVolume,
interest_rate_for_long: -fundingRateObj.fundingRate! * ticker.ask!,
interest_rate_for_short: fundingRateObj.fundingRate! * ticker.bid!,
settlement_scheduled_at: fundingRateObj.fundingTimestamp,
};
}),
catchError((e) => {
console.error(formatTime(Date.now()), 'tick_stream', product_id, e);
throw e;
}),
retry(1000),
repeat(500),
);
});

terminal.providePeriods(EXCHANGE_ID, (product_id, period_in_sec) => {
console.info(formatTime(Date.now()), 'period_stream', product_id, period_in_sec);
return defer(() => {
Expand All @@ -306,13 +364,13 @@ import {
datasource_id: EXCHANGE_ID,
product_id,
period_in_sec,
timestamp_in_us: t * 1000,
timestamp_in_us: t! * 1000,
start_at: t,
open: o,
high: h,
low: l,
close: c,
volume: vol,
open: o!,
high: h!,
low: l!,
close: c!,
volume: vol!,
}),
),
toArray(),
Expand Down
3 changes: 2 additions & 1 deletion apps/vendor-ccxt/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"extends": "./node_modules/@rushstack/heft-node-rig/profiles/default/tsconfig-base.json",
"compilerOptions": {
"lib": ["DOM", "ESNext"],
"types": ["heft-jest", "node"]
"types": ["heft-jest", "node"],
"skipLibCheck": true
}
}
10 changes: 10 additions & 0 deletions common/changes/@yuants/vendor-ccxt/2024-02-20-18-24.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/vendor-ccxt",
"comment": "add subscribe tick",
"type": "patch"
}
],
"packageName": "@yuants/vendor-ccxt"
}
10 changes: 5 additions & 5 deletions common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d684294

Please sign in to comment.