This repository has been archived by the owner on Jul 22, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsubscribeFunc.ts
204 lines (182 loc) · 6.19 KB
/
subscribeFunc.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import { WsProvider, ApiPromise } from '@polkadot/api';
import { RegisteredTypes } from '@polkadot/types/types';
import {
IDisconnectedRange,
CWEvent,
SubscribeFunc,
ISubscribeOptions,
SupportedNetwork,
} from '../../interfaces';
import { addPrefix, factory } from '../../logging';
import { Subscriber } from './subscriber';
import { Poller } from './poller';
import { Processor } from './processor';
import { Block, IEventData } from './types';
import { EnricherConfig } from './filters/enricher';
export interface ISubstrateSubscribeOptions
extends ISubscribeOptions<ApiPromise> {
enricherConfig?: EnricherConfig;
}
/**
* Attempts to open an API connection, retrying if it cannot be opened.
* @param url websocket endpoing to connect to, including ws[s]:// and port
* @param typeOverrides
* @param chain
* @returns a promise resolving to an ApiPromise once the connection has been established
*/
export async function createApi(
url: string,
typeOverrides: RegisteredTypes = {},
chain?: string
): Promise<ApiPromise> {
const log = factory.getLogger(
addPrefix(__filename, [SupportedNetwork.Substrate, chain])
);
for (let i = 0; i < 3; ++i) {
const provider = new WsProvider(url, 0);
let unsubscribe: () => void;
const success = await new Promise<boolean>((resolve) => {
unsubscribe = provider.on('connected', () => resolve(true));
provider.on('error', () => {
if (i < 2)
log.warn(`An error occurred connecting to ${url} - retrying...`);
resolve(false);
});
provider.on('disconnected', () => resolve(false));
provider.connect();
});
// construct API using provider
if (success) {
unsubscribe();
return ApiPromise.create({
provider,
...typeOverrides,
});
}
// TODO: add delay
}
throw new Error(
`[${SupportedNetwork.Substrate}${
chain ? `::${chain}` : ''
}]: Failed to connect to API endpoint at: ${url}`
);
}
/**
* This is the main function for substrate event handling. It constructs a connection
* to the chain, connects all event-related modules, and initializes event handling.
* @param options
* @returns An active block subscriber.
*/
export const subscribeEvents: SubscribeFunc<
ApiPromise,
Block,
ISubstrateSubscribeOptions
> = async (options) => {
const {
chain,
api,
handlers,
skipCatchup,
archival,
startBlock,
discoverReconnectRange,
verbose,
enricherConfig,
} = options;
const log = factory.getLogger(
addPrefix(__filename, [SupportedNetwork.Substrate, chain])
);
// helper function that sends an event through event handlers
const handleEventFn = async (event: CWEvent<IEventData>): Promise<void> => {
let prevResult = null;
for (const handler of handlers) {
try {
event.chain = chain;
event.received = Date.now();
// pass result of last handler into next one (chaining db events)
prevResult = await handler.handle(event, prevResult);
} catch (err) {
log.error(`Event handle failure: ${err.message}`);
break;
}
}
};
// helper function that sends a block through the event processor and
// into the event handlers
const processor = new Processor(api, enricherConfig || {});
const processBlockFn = async (block: Block): Promise<void> => {
// retrieve events from block
const events: CWEvent<IEventData>[] = await processor.process(block);
// send all events through event-handlers in sequence
for (const event of events) await handleEventFn(event);
};
const subscriber = new Subscriber(api, verbose);
const poller = new Poller(api);
// if running in archival mode: run poller.archive with batch_size 50
// then exit without subscribing.
// TODO: should we start subscription?
if (archival) {
// default to startBlock 0
const offlineRange: IDisconnectedRange = { startBlock: startBlock ?? 0 };
log.info(
`Executing in archival mode, polling blocks starting from: ${offlineRange.startBlock}`
);
await poller.archive(offlineRange, 50, processBlockFn);
return subscriber;
}
// helper function that runs after we've been offline/the server's been down,
// and attempts to fetch events from skipped blocks
const pollMissedBlocksFn = async (): Promise<void> => {
log.info('Detected offline time, polling missed blocks...');
// grab the cached block immediately to avoid a new block appearing before the
// server can do its thing...
const { lastBlockNumber } = processor;
// determine how large of a reconnect we dealt with
let offlineRange: IDisconnectedRange;
// first, attempt the provided range finding method if it exists
// (this should fetch the block of the last server event from database)
if (discoverReconnectRange) {
offlineRange = await discoverReconnectRange();
}
// compare with default range algorithm: take last cached block in processor
// if it exists, and is more recent than the provided algorithm
// (note that on first run, we wont have a cached block/this wont do anything)
if (
lastBlockNumber &&
(!offlineRange ||
!offlineRange.startBlock ||
offlineRange.startBlock < lastBlockNumber)
) {
offlineRange = { startBlock: lastBlockNumber };
}
// if we can't figure out when the last block we saw was,
// do nothing
// (i.e. don't try and fetch all events from block 0 onward)
if (!offlineRange || !offlineRange.startBlock) {
log.warn('Unable to determine offline time range.');
return;
}
try {
const blocks = await poller.poll(offlineRange);
await Promise.all(blocks.map(processBlockFn));
} catch (e) {
log.error(
`Block polling failed after disconnect at block ${offlineRange.startBlock}`
);
}
};
if (!skipCatchup) {
await pollMissedBlocksFn();
} else {
log.info('Skipping event catchup on startup!');
}
try {
log.info(`Subscribing to ${chain} endpoint...`);
await subscriber.subscribe(processBlockFn);
// handle reconnects with poller
api.on('connected', pollMissedBlocksFn);
} catch (e) {
log.error(`Subscription error: ${e.message}`);
}
return subscriber;
};