Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LevelDB storage fix and new event handlers on sdk-contracts #94

Merged
merged 4 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 126 additions & 34 deletions packages/contracts-manger/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {
Address,
Hash,
Abi,
Account,
InferFunctionName,
GetFunctionArgs,
Expand All @@ -17,12 +16,15 @@ import {
zeroAddress,
GetFilterLogsReturnType,
InferEventName,
Abi,
CreateContractEventFilterParameters,
} from 'viem';
import { stringify } from 'superjson';
import {
marketABI,
erc20_18ABI,
entitiesRegistryABI,
configABI,
kinds,
} from '@windingtree/contracts';
import {
Expand Down Expand Up @@ -52,6 +54,11 @@ export interface ProtocolContractsOptions {
walletClient?: WalletClient;
}

/**
* Generic filter options type.
*/
type FilterOptions = CreateContractEventFilterParameters<Abi, string>;

/**
* Common API of the protocol smart contracts set
*
Expand Down Expand Up @@ -688,69 +695,154 @@ export class ProtocolContracts<
}

/**
* Subscribes to specific events emitted by the market smart contract.
* Subscribes to events from a specified smart contract.
*
* @param eventName - The name of the event to listen for.
* @param onLogs - Callback function to handle the event logs.
* @param fromBlock - (Optional) The starting block number for listening to events.
* @param pollInterval - (Optional) Interval in milliseconds for polling new events.
* @returns A function to unsubscribe from the event.
* @template TEventName - Generic type parameter for event name.
* @template TAbi The ABI type of the contract.
* @template TEventName The name of the event to subscribe to.
* @param {TAbi} abi The ABI of the contract to subscribe to.
* @param {Address} address The address of the contract.
* @param {InferEventName<TAbi, TEventName>} eventName The name of the event.
* @param {(logs: GetFilterLogsReturnType<TAbi>) => void} onLogs Callback to execute when logs are received.
* @param {bigint} [fromBlock] The block number from which to start listening for events.
* @param {number} [pollInterval=1000] The interval in milliseconds at which to poll for new events.
* @returns {Promise<() => void>} A promise that resolves to an unsubscribe function.
* @private
*/
async subscribeMarket<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof marketABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof marketABI>) => void,
private async subscribeToEvents<
// eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents
const TAbi extends Abi | readonly unknown[] = Abi,
TEventName extends string | undefined = undefined,
>(
abi: TAbi,
address: Address,
eventName: InferEventName<TAbi, TEventName>,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onLogs: (logs: any) => void,
fromBlock?: bigint,
pollInterval = 1000,
pollInterval: number = 1000,
): Promise<() => void> {
let blockNumber = await this.publicClient.getBlockNumber();
let isUnsubscribed = false;
let timeoutId: NodeJS.Timeout;

// Use the specified fromBlock or the current block number
// Adjust starting block number if fromBlock is provided and valid
if (fromBlock && fromBlock < blockNumber) {
blockNumber = fromBlock;
}

// Function to fetch and process logs
const getLogs = async () => {
if (isUnsubscribed) return; // Stop if unsubscribed
if (isUnsubscribed) return;

// Create an event filter
const filter = await this.publicClient.createContractEventFilter({
abi: marketABI,
address: this.contracts['market'].address,
eventName,
abi,
address,
fromBlock: blockNumber,
strict: true,
});
eventName: eventName,
} as unknown as FilterOptions);

// Retrieve logs based on the filter
const logs = await this.publicClient.getFilterLogs({ filter });

// Process logs and update the block number
if (logs.length > 0) {
const maxBlockNumber = logs.reduce(
(max, log) => (log.blockNumber > max ? log.blockNumber : max),
BigInt(0),
);
blockNumber = maxBlockNumber + BigInt(1);
onLogs(logs);
// Update the block number to the next after the last log's block
const bn = logs[logs.length - 1].blockNumber;
blockNumber = (bn !== null ? bn : BigInt(0)) + BigInt(1);
}

// Schedule the next call
timeoutId = setTimeout(() => {
getLogs().catch(logger.error);
}, pollInterval);
if (!isUnsubscribed) {
timeoutId = setTimeout(() => {
getLogs().catch(logger.error);
}, pollInterval);
}
};

// Initial call to start the polling process
// Initial call to start polling
getLogs().catch(logger.error);

// Return the unsubscribe function
// Return unsubscribe function
return () => {
isUnsubscribed = true; // Set the flag to stop further polling
clearTimeout(timeoutId); // Clear the timeout to stop scheduled calls
isUnsubscribed = true;
clearTimeout(timeoutId);
};
}

/**
* Subscribes to market contract events.
*
* @template TEventName Type of event name.
* @param {InferEventName<typeof marketABI, TEventName>} eventName The event name to subscribe to.
* @param {(logs: GetFilterLogsReturnType<typeof marketABI>) => void} onLogs Callback for when logs are received.
* @param {bigint} [fromBlock] Starting block number for listening for events.
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
* @returns {Promise<() => void>} Unsubscribe function.
*/
async subscribeMarket<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof marketABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof marketABI>) => void,
fromBlock?: bigint,
pollInterval: number = 1000,
): Promise<() => void> {
return this.subscribeToEvents(
marketABI,
this.contracts['market'].address,
eventName,
onLogs,
fromBlock,
pollInterval,
);
}

/**
* Subscribes to entities contract events.
*
* @template TEventName Type of event name.
* @param {InferEventName<typeof entitiesRegistryABI, TEventName>} eventName The event name to subscribe to.
* @param {(logs: GetFilterLogsReturnType<typeof entitiesRegistryABI>) => void} onLogs Callback for when logs are received.
* @param {bigint} [fromBlock] Starting block number for listening for events.
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
* @returns {Promise<() => void>} Unsubscribe function.
*/
async subscribeEntities<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof entitiesRegistryABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof entitiesRegistryABI>) => void,
fromBlock?: bigint,
pollInterval: number = 1000,
): Promise<() => void> {
return this.subscribeToEvents(
entitiesRegistryABI,
this.contracts['entities'].address,
eventName,
onLogs,
fromBlock,
pollInterval,
);
}

/**
* Subscribes to config contract events.
*
* @template TEventName Type of event name.
* @param {InferEventName<typeof configABI, TEventName>} eventName The event name to subscribe to.
* @param {(logs: GetFilterLogsReturnType<typeof configABI>) => void} onLogs Callback for when logs are received.
* @param {bigint} [fromBlock] Starting block number for listening for events.
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
* @returns {Promise<() => void>} Unsubscribe function.
*/
async subscribeConfig<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof configABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof configABI>) => void,
fromBlock?: bigint,
pollInterval: number = 1000,
): Promise<() => void> {
return this.subscribeToEvents(
configABI,
this.contracts['config'].address,
eventName,
onLogs,
fromBlock,
pollInterval,
);
}
}
1 change: 1 addition & 0 deletions packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^10.1.0",
"@libp2p/interface": "^0.1.3",
"@libp2p/peer-id": "^4.0.6",
"@multiformats/multiaddr": "^12.1.3",
"@windingtree/sdk-constants": "workspace:*",
"@windingtree/sdk-logger": "workspace:*",
Expand Down
10 changes: 9 additions & 1 deletion packages/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { decodeText } from '@windingtree/sdk-utils';
import { CashedMessageEntry, MessagesCache } from './cache.js';
import { createLogger } from '@windingtree/sdk-logger';
import { parse } from 'superjson';
import { peerIdFromBytes } from '@libp2p/peer-id';

const logger = createLogger('CenterSub');

Expand Down Expand Up @@ -179,13 +180,20 @@ export class CenterSub extends GossipSub {
logger.trace('messageTransformer not defined');
return;
}

rpcMsg.from = new Uint8Array(rpcMsg.from as ArrayBufferLike);
rpcMsg.signature = new Uint8Array(rpcMsg.signature as ArrayBufferLike);
rpcMsg.key = new Uint8Array(rpcMsg.key as ArrayBufferLike);
rpcMsg.data = new Uint8Array(rpcMsg.data as ArrayBufferLike);
rpcMsg.seqno = new Uint8Array(rpcMsg.seqno as ArrayBufferLike);

const msgId = await sha256.encode(rpcMsg.data);
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
const msgIdStr = this['msgIdToStrFn'](msgId) as string;
const transformed = this.messageTransformer(rpcMsg.data);
await this.messages.set(
msgIdStr,
rpcMsg.from.toString(),
peerIdFromBytes(rpcMsg.from).toString(),
rpcMsg,
Number(transformed.expire),
Number(transformed.nonce),
Expand Down
Loading
Loading