Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp reporter (2/2): handle json rpc down and server restart/crash #1616

Merged
merged 13 commits into from
Jun 17, 2024
12 changes: 4 additions & 8 deletions core/.eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
"plugin:@typescript-eslint/recommended",
"plugin:prettier/recommended"
],
"plugins": [
"prettier",
"@typescript-eslint"
],
"plugins": ["prettier", "@typescript-eslint"],
"rules": {
"no-throw-literal": "error",
"prettier/prettier": "error",
Expand All @@ -23,11 +20,10 @@
"varsIgnorePattern": "^_",
"caughtErrorsIgnorePattern": "^_"
}
]
],
"no-constant-condition": "warn"
},
"ignorePatterns": [
"src/cli/orakl-cli/dist/**"
],
"ignorePatterns": ["src/cli/orakl-cli/dist/**"],
"parserOptions": {
"sourceType": "module",
"ecmaVersion": 2022,
Expand Down
2 changes: 1 addition & 1 deletion core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ export enum OraklErrorCode {
FailedInsertUnprocessedBlock,
FailedDeleteUnprocessedBlock,
NonceNotFound,
FailedToGetWalletTransactionCount
TxNonceExpired
}
22 changes: 20 additions & 2 deletions core/src/reporter/reporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export function reporter(state: State, logger: Logger) {
let delegatorOkay = true
const NUM_TRANSACTION_TRIALS = 3
const txParams = { to, payload, gasLimit, logger, nonce }
let localNonce = nonce

for (let i = 0; i < NUM_TRANSACTION_TRIALS; ++i) {
if (state.delegatedFee && delegatorOkay) {
Expand All @@ -36,15 +37,25 @@ export function reporter(state: State, logger: Logger) {
break
} catch (e) {
delegatorOkay = false
if (e.code === OraklErrorCode.TxNonceExpired) {
localNonce = await state.getAndIncrementNonce(to)
}
}
} else if (state.delegatedFee) {
try {
await sendTransactionCaver({ ...txParams, wallet: wallet as CaverWallet })
break
} catch (e) {
if (![OraklErrorCode.CaverTxTransactionFailed].includes(e.code)) {
if (
![OraklErrorCode.CaverTxTransactionFailed, OraklErrorCode.TxNonceExpired].includes(
e.code
)
) {
throw e
}
if (e.code === OraklErrorCode.TxNonceExpired) {
localNonce = await state.getAndIncrementNonce(to)
}
}
} else {
try {
Expand All @@ -55,15 +66,22 @@ export function reporter(state: State, logger: Logger) {
![
OraklErrorCode.TxNotMined,
OraklErrorCode.TxProcessingResponseError,
OraklErrorCode.TxMissingResponseError
OraklErrorCode.TxMissingResponseError,
OraklErrorCode.TxNonceExpired
].includes(e.code)
) {
throw e
}

if (e.code === OraklErrorCode.TxNonceExpired) {
localNonce = await state.getAndIncrementNonce(to)
}

logger.info(`Retrying transaction. Trial number: ${i}`)
}
}

txParams.nonce = localNonce
}
}

Expand Down
43 changes: 2 additions & 41 deletions core/src/reporter/state.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { NonceManager } from '@ethersproject/experimental'
import { Mutex } from 'async-mutex'
import { Logger } from 'pino'
import type { RedisClientType } from 'redis'
Expand All @@ -7,7 +6,7 @@ import { OraklError, OraklErrorCode } from '../errors'
import { IReporterConfig } from '../types'
import { isAddressValid } from '../utils'
import { Wallet } from './types'
import { buildCaverWallet, buildWallet, CaverWallet, isPrivateKeyAddressPairValid } from './utils'
import { buildCaverWallet, buildWallet, isPrivateKeyAddressPairValid } from './utils'

const FILE_NAME = import.meta.url

Expand Down Expand Up @@ -277,52 +276,14 @@ export class State {
}

/**
* Get nonce for oracleAddress. If nonce is not found, raise an error.
* This function implements a mutex to ensure it cannot be called concurrently.
*
* @param {string} oracleAddress
* @return {number} nonce
* @exception {OraklErrorCode.WalletNotActive} raise when wallet is not active
* @exception {OraklErrorCode.FailedToGetWalletTransactionCount} raise when failed to get wallet transaction count
*/
async getAndIncrementNonce(oracleAddress: string): Promise<number> {
return await this.mutex.runExclusive(async () => {
const wallet = this.wallets[oracleAddress]
if (!wallet) {
const msg = `Wallet for oracle ${oracleAddress} is not active`
this.logger.error({ name: 'getAndIncrementNonce', file: FILE_NAME }, msg)
throw new OraklError(OraklErrorCode.WalletNotActive, msg)
}

let remoteNonce: number
try {
if (this.delegatedFee) {
const caverWallet = wallet as CaverWallet
remoteNonce = Number(
await caverWallet.caver.rpc.klay.getTransactionCount(caverWallet.address)
)
} else {
remoteNonce = await (wallet as NonceManager).getTransactionCount()
}
} catch (error) {
const msg = `Failed to get nonce for wallet`
this.logger.error({ name: 'getAndIncrementNonce', file: FILE_NAME }, msg)
throw new OraklError(OraklErrorCode.FailedToGetWalletTransactionCount, msg)
}

const localNonce = this.nonces[oracleAddress]

let nonce: number
if (!localNonce || remoteNonce > localNonce) {
this.logger.warn(
{ name: 'getAndIncrementNonce', file: FILE_NAME },
`Nonce value discrepancy. Remote: ${remoteNonce}, Local: ${localNonce}. Updating local nonce to remote nonce.`
)
nonce = remoteNonce
} else {
nonce = localNonce
}

const nonce = this.nonces[oracleAddress]
this.nonces[oracleAddress] = nonce + 1

return nonce
Expand Down
7 changes: 7 additions & 0 deletions core/src/reporter/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ export async function sendTransaction({
} else if (e.code == 'UNPREDICTABLE_GAS_LIMIT') {
msg = 'TxCannotEstimateGasError'
error = new OraklError(OraklErrorCode.TxCannotEstimateGasError, msg, e.value)
} else if (e.code == 'NONCE_EXPIRED') {
msg = 'TxNonceExpired'
error = new OraklError(OraklErrorCode.TxNonceExpired, msg)
} else {
error = e
}
Expand Down Expand Up @@ -264,3 +267,7 @@ export function isPrivateKeyAddressPairValid(sk: string, addr: string): boolean
return false
}
}

export async function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
6 changes: 2 additions & 4 deletions core/src/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export const MAX_DATA_STALENESS = 5_000
// BullMQ
export const REMOVE_ON_COMPLETE = 500
export const REMOVE_ON_FAIL = 1_000
export const CONCURRENCY = 12
export const CONCURRENCY = 50
export const DATA_FEED_REPORTER_CONCURRENCY =
Number(process.env.DATA_FEED_REPORTER_CONCURRENCY) || 15

Expand Down Expand Up @@ -235,11 +235,9 @@ export const WORKER_JOB_SETTINGS = {

export const NONCE_MANAGER_JOB_SETTINGS = {
removeOnComplete: REMOVE_ON_COMPLETE,
// FIXME Should not be removed until resolved, however, for now in
// testnet, we can safely keep this settings.
removeOnFail: REMOVE_ON_FAIL,
attempts: 10,
backoff: 1_000
backoff: 500
}

export function getObservedBlockRedisKey(contractAddress: string) {
Expand Down
25 changes: 20 additions & 5 deletions core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,35 @@ export function pad32Bytes(data) {
}

let slackSentTime = new Date().getTime()
let errMsg = null
let errMsg: string | null = null

async function sendToSlack(error) {
export async function sendToSlack(e: Error) {
if (SLACK_WEBHOOK_URL) {
const e = error[0]
const webhook = new IncomingWebhook(SLACK_WEBHOOK_URL)
let errorObj = {}
if (Array.isArray(e)) {
errorObj = {
message: e[0]?.message,
stack: e[0]?.stack,
name: e[0]?.name
}
} else {
errorObj = {
message: e.message,
stack: e.stack,
name: e.name
}
}
const text = ` :fire: _An error has occurred at_ \`${os.hostname()}\`\n \`\`\`${JSON.stringify(
e
errorObj
)} \`\`\`\n>*System information*\n>*memory*: ${os.freemem()}/${os.totalmem()}\n>*machine*: ${os.machine()}\n>*platform*: ${os.platform()}\n>*upTime*: ${os.uptime()}\n>*version*: ${os.version()}
`

try {
if (e && e.message && errMsg === e.message) {
// if the same error message is sent to slack before
if (e?.message && errMsg === e.message) {
const now = new Date().getTime()
// if it's over 1 min since the last message was sent
Intizar-T marked this conversation as resolved.
Show resolved Hide resolved
if (slackSentTime + 60_000 < now) {
await webhook.send({ text })
slackSentTime = now
Expand Down
2 changes: 1 addition & 1 deletion core/test/caver-js.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('Test Caver-js', function () {
BigNumber.from(beforeBalanceOfAccount).sub(BigNumber.from(amount)).sub(txFee)
)
).toBe(true)
})
}, 60_000)
} else {
test('Send signed tx with is ethers on local', async function () {
const provider = new ethers.providers.JsonRpcProvider('http://127.0.0.1:8545')
Expand Down
101 changes: 1 addition & 100 deletions core/test/nonce-manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import { NonceManager } from '@ethersproject/experimental'
import { jest } from '@jest/globals'
import { Mutex } from 'async-mutex'
import { createClient, RedisClientType } from 'redis'
import { OraklError, OraklErrorCode } from '../src/errors'
import { buildMockLogger } from '../src/logger'
import { State } from '../src/reporter/state'
import { CaverWallet } from '../src/reporter/utils'

describe('nonce-manager', () => {
const PROVIDER_URL = process.env.GITHUB_ACTIONS
Expand Down Expand Up @@ -52,36 +48,6 @@ describe('nonce-manager', () => {
})
})

test('wallet not active', async () => {
try {
state.mutex = new Mutex()
state.wallets = {}
await state.getAndIncrementNonce(ORACLE_ADDRESS)
} catch (error) {
expect(error.code).toBe(OraklErrorCode.WalletNotActive)
}
})

test('cannot get transaction count', async () => {
// override state.getTransactionCount() to throw error
await state.refresh()
const wallet = state.wallets[ORACLE_ADDRESS] as NonceManager
jest.spyOn(wallet, 'getTransactionCount').mockImplementation(async () => {
throw new OraklError(OraklErrorCode.FailedToGetWalletTransactionCount)
})
try {
await state.getAndIncrementNonce(ORACLE_ADDRESS)
} catch (error) {
expect(error.code).toBe(OraklErrorCode.FailedToGetWalletTransactionCount)
}

await delegatedState.refresh()
const caverWallet = delegatedState.wallets[ORACLE_ADDRESS] as CaverWallet
jest.spyOn(caverWallet.caver.rpc.klay, 'getTransactionCount').mockImplementation(async () => {
throw new OraklError(OraklErrorCode.FailedToGetWalletTransactionCount)
})
})

test('increments nonce after func is called', async () => {
for (const currState of [state, delegatedState]) {
await currState.refresh()
Expand All @@ -91,27 +57,10 @@ describe('nonce-manager', () => {
}
})

test('check delegatedFee handling', async () => {
// check that nonce is updated correctly when delegatedFee is true & false
await state.refresh()
const wallet = state.wallets[ORACLE_ADDRESS] as NonceManager
const currNonce = await wallet.getTransactionCount()
const returnedNonce = await state.getAndIncrementNonce(ORACLE_ADDRESS)
expect(returnedNonce).toBe(currNonce)

await delegatedState.refresh()
const caverWallet = delegatedState.wallets[ORACLE_ADDRESS] as CaverWallet
const currCaverNonce = Number(
await caverWallet.caver.rpc.klay.getTransactionCount(caverWallet.address)
)
const returnedCaverNonce = await delegatedState.getAndIncrementNonce(ORACLE_ADDRESS)
expect(returnedCaverNonce).toBe(currCaverNonce)
})

test('concurrent nonce calls', async () => {
// send multiple concurrent calls to getAndIncrementNonce()
// check that all nonces are unique and increment by 1
const CONCURRENT_CALLS = 10
const CONCURRENT_CALLS = 50

for (const currState of [state, delegatedState]) {
await currState.refresh()
Expand All @@ -123,53 +72,5 @@ describe('nonce-manager', () => {
expect(new Set(nonces).size).toBe(CONCURRENT_CALLS)
expect(Math.max(...nonces) - Math.min(...nonces)).toBe(CONCURRENT_CALLS - 1)
}
}, 60_000)

test('localNonce is smaller than walletNonce', async () => {
// when walletNonce is greater than localNonce,
// localNonce should be updated to walletNonce
for (const currState of [state, delegatedState]) {
await currState.refresh()
currState.nonces[ORACLE_ADDRESS] = 0
const nonce = await currState.getAndIncrementNonce(ORACLE_ADDRESS)

if (!currState.delegatedFee) {
const wallet = currState.wallets[ORACLE_ADDRESS] as NonceManager
const walletNonce = await wallet.getTransactionCount()
expect(nonce).toBe(walletNonce)
} else {
const caverWallet = currState.wallets[ORACLE_ADDRESS] as CaverWallet
const walletNonce = Number(
await caverWallet.caver.rpc.klay.getTransactionCount(caverWallet.address)
)
expect(nonce).toBe(walletNonce)
}
}
})

test('localNonce is greater than walletNonce', async () => {
// when localNonce is smaller than walletNonce,
// nothing should happen, localNonce should be returned and incremented
for (const currState of [state, delegatedState]) {
await currState.refresh()

if (!currState.delegatedFee) {
const wallet = currState.wallets[ORACLE_ADDRESS] as NonceManager
const walletNonce = await wallet.getTransactionCount()

currState.nonces[ORACLE_ADDRESS] = walletNonce + 1
const nonce = await currState.getAndIncrementNonce(ORACLE_ADDRESS)
expect(nonce).not.toBe(walletNonce)
} else {
const caverWallet = currState.wallets[ORACLE_ADDRESS] as CaverWallet
const walletNonce = Number(
await caverWallet.caver.rpc.klay.getTransactionCount(caverWallet.address)
)

currState.nonces[ORACLE_ADDRESS] = walletNonce + 1
const nonce = await currState.getAndIncrementNonce(ORACLE_ADDRESS)
expect(nonce).not.toBe(walletNonce)
}
}
})
})
Loading