Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scale execution #358

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ test/spec-tests/bundler-spec-tests
test/build
.DS_Store
Alto-Bruno/
temp.ts
.nx/cache
.vscode/
.sentryclirc
7 changes: 3 additions & 4 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/cli/alto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
debugOptions,
gasEstimationOptions,
logOptions,
mempoolOptions,
rpcOptions,
serverOptions
} from "./config"
Expand Down Expand Up @@ -50,6 +51,8 @@ export function getAltoCli(): yargs.Argv {
})
.options(bundlerOptions)
.group(Object.keys(bundlerOptions), "Options:")
.options(mempoolOptions)
.group(Object.keys(mempoolOptions), "Mempool Options:")
.options(compatibilityOptions)
.group(Object.keys(compatibilityOptions), "Compatibility Options:")
.options(serverOptions)
Expand Down
25 changes: 17 additions & 8 deletions src/cli/config/bundler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ export const bundlerArgsSchema = z.object({
.transform(([slow, standard, fast]) => ({ slow, standard, fast })),
"gas-price-refresh-interval": z.number().int().min(0),

"mempool-max-parallel-ops": z.number().int().min(0).default(10),
"mempool-max-queued-ops": z.number().int().min(0).default(0),
"enforce-unique-senders-per-bundle": z.boolean().default(true),
"max-gas-per-bundle": z
.string()
.transform((val) => BigInt(val))
Expand Down Expand Up @@ -170,11 +167,19 @@ export const rpcArgsSchema = z.object({
"code-override-support": z.boolean().optional().default(false)
})

export const bundleCopmressionArgsSchema = z.object({
export const bundleCompressionArgsSchema = z.object({
"bundle-bulker-address": addressSchema.optional(),
"per-op-inflator-address": addressSchema.optional()
})

export const mempoolArgsSchema = z.object({
"redis-mempool-url": z.string().optional(),
"redis-mempool-concurrency": z.number().int().min(0).default(10),
"mempool-max-parallel-ops": z.number().int().min(0).default(10),
"mempool-max-queued-ops": z.number().int().min(0).default(0),
"enforce-unique-senders-per-bundle": z.boolean().default(true)
})

export const logArgsSchema = z.object({
"redis-queue-endpoint": z.string().optional(),
"redis-event-manager-queue-name": z.string().optional(),
Expand Down Expand Up @@ -224,9 +229,9 @@ export type IServerArgsInput = z.input<typeof serverArgsSchema>
export type IRpcArgs = z.infer<typeof rpcArgsSchema>
export type IRpcArgsInput = z.input<typeof rpcArgsSchema>

export type IBundleCompressionArgs = z.infer<typeof bundleCopmressionArgsSchema>
export type IBundleCompressionArgs = z.infer<typeof bundleCompressionArgsSchema>
export type IBundleCompressionArgsInput = z.input<
typeof bundleCopmressionArgsSchema
typeof bundleCompressionArgsSchema
>

export type ILogArgs = z.infer<typeof logArgsSchema>
Expand All @@ -238,15 +243,19 @@ export type IDebugArgsInput = z.input<typeof debugArgsSchema>
export type IGasEstimationArgs = z.infer<typeof gasEstimationArgsSchema>
export type IGasEstimationArgsInput = z.input<typeof gasEstimationArgsSchema>

export type IMempoolArgs = z.infer<typeof mempoolArgsSchema>
export type IMempoolArgsInput = z.input<typeof mempoolArgsSchema>

export const optionArgsSchema = z.object({
...bundlerArgsSchema.shape,
...compatibilityArgsSchema.shape,
...logArgsSchema.shape,
...serverArgsSchema.shape,
...rpcArgsSchema.shape,
...bundleCopmressionArgsSchema.shape,
...bundleCompressionArgsSchema.shape,
...debugArgsSchema.shape,
...gasEstimationArgsSchema.shape
...gasEstimationArgsSchema.shape,
...mempoolArgsSchema.shape
})

export type IOptions = z.infer<typeof optionArgsSchema>
Expand Down
58 changes: 37 additions & 21 deletions src/cli/config/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
IDebugArgsInput,
IGasEstimationArgsInput,
ILogArgsInput,
IMempoolArgsInput,
IOptionsInput,
IRpcArgsInput,
IServerArgsInput
Expand Down Expand Up @@ -141,27 +142,6 @@ export const bundlerOptions: CliCommandOptions<IBundlerArgsInput> = {
require: false,
default: 0
},
"mempool-max-parallel-ops": {
description:
"Maximum amount of parallel user ops to keep in the meempool (same sender, different nonce keys)",
type: "number",
require: false,
default: 10
},
"mempool-max-queued-ops": {
description:
"Maximum amount of sequential user ops to keep in the mempool (same sender and nonce key, different nonce values)",
type: "number",
require: false,
default: 0
},
"enforce-unique-senders-per-bundle": {
description:
"Include user ops with the same sender in the single bundle",
type: "boolean",
require: false,
default: true
},
"max-gas-per-bundle": {
description: "Maximum amount of gas per bundle",
type: "string",
Expand Down Expand Up @@ -479,6 +459,42 @@ export const debugOptions: CliCommandOptions<IDebugArgsInput> = {
}
}

export const mempoolOptions: CliCommandOptions<IMempoolArgsInput> = {
"redis-mempool-url": {
description:
"Redis connection URL (required if redis-mempool is enabled)",
type: "string",
require: false
},
"redis-mempool-concurrency": {
description: "Number of concurrent jobs to process",
type: "number",
require: false,
default: 10
},
"mempool-max-parallel-ops": {
description:
"Maximum amount of parallel user ops to keep in the mempool (same sender, different nonce keys)",
type: "number",
require: false,
default: 10
},
"mempool-max-queued-ops": {
description:
"Maximum amount of sequential user ops to keep in the mempool (same sender and nonce key, different nonce values)",
type: "number",
require: false,
default: 0
},
"enforce-unique-senders-per-bundle": {
description:
"Include user ops with the same sender in the single bundle",
type: "boolean",
require: false,
default: true
}
}

export const bundlerCommand: CliCommand<IOptionsInput> = {
command: "$0",
describe: "Starts the bundler",
Expand Down
7 changes: 3 additions & 4 deletions src/cli/handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SenderManager } from "@alto/executor"
import { createSenderManager } from "@alto/executor"
import { GasPriceManager } from "@alto/handlers"
import {
createMetrics,
Expand Down Expand Up @@ -148,10 +148,9 @@ export async function bundlerHandler(args_: IOptionsInput): Promise<void> {

await preFlightChecks(config)

const senderManager = new SenderManager({
const senderManager = await createSenderManager({
config,
metrics,
gasPriceManager
metrics
})

const utilityWalletAddress = config.utilityPrivateKey?.address
Expand Down
56 changes: 38 additions & 18 deletions src/cli/setupServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from "@alto/handlers"
import {
type InterfaceReputationManager,
MemoryMempool,
Mempool,
Monitor,
NullReputationManager,
ReputationManager
Expand All @@ -22,7 +22,9 @@ import {
import type { InterfaceValidator } from "@alto/types"
import type { Metrics } from "@alto/utils"
import type { Registry } from "prom-client"
import type { AltoConfig } from "../createConfig"
import type { AltoConfig } from "@alto/config"
import { type Store, createMemoryStore, createRedisStore } from "@alto/store"
import { validateAndRefillWallets } from "../executor/validateAndRefillWallets"

const getReputationManager = (
config: AltoConfig
Expand Down Expand Up @@ -77,14 +79,28 @@ const getMempool = ({
validator: InterfaceValidator
metrics: Metrics
eventManager: EventManager
}): MemoryMempool => {
return new MemoryMempool({
}): Mempool => {
let store: Store

if (config.redisMempoolUrl) {
store = createRedisStore({
config,
metrics
})
} else {
store = createMemoryStore({
config,
metrics
})
}

return new Mempool({
config,
monitor,
reputationManager,
validator,
metrics,
eventManager
eventManager,
store
})
}

Expand Down Expand Up @@ -155,7 +171,7 @@ const getExecutorManager = ({
}: {
config: AltoConfig
executor: Executor
mempool: MemoryMempool
mempool: Mempool
monitor: Monitor
reputationManager: InterfaceReputationManager
metrics: Metrics
Expand All @@ -180,7 +196,7 @@ const getNonceQueuer = ({
eventManager
}: {
config: AltoConfig
mempool: MemoryMempool
mempool: Mempool
eventManager: EventManager
}) => {
return new NonceQueuer({
Expand All @@ -206,7 +222,7 @@ const getRpcHandler = ({
}: {
config: AltoConfig
validator: InterfaceValidator
mempool: MemoryMempool
mempool: Mempool
executor: Executor
monitor: Monitor
nonceQueuer: NonceQueuer
Expand Down Expand Up @@ -281,10 +297,10 @@ export const setupServer = async ({
})

if (config.refillingWallets) {
await senderManager.validateAndRefillWallets()
await validateAndRefillWallets({ config, gasPriceManager, metrics })

setInterval(async () => {
await senderManager.validateAndRefillWallets()
await validateAndRefillWallets({ config, gasPriceManager, metrics })
}, config.executorRefillInterval * 1000)
}

Expand Down Expand Up @@ -350,7 +366,7 @@ export const setupServer = async ({
)

rootLogger.info(
`Initialized ${senderManager.wallets.length} executor wallets`
`Initialized ${senderManager.getAllWallets().length} executor wallets`
)

const server = getServer({
Expand All @@ -368,17 +384,21 @@ export const setupServer = async ({
await server.stop()
rootLogger.info("server stopped")

const outstanding = mempool.dumpOutstanding().length
const submitted = mempool.dumpSubmittedOps().length
const processing = mempool.dumpProcessing().length
const submitted = (await mempool.dumpSubmittedOps()).length
const processing = (await mempool.dumpProcessing()).length
rootLogger.info(
{ outstanding, submitted, processing },
{ submitted, processing },
"dumping mempool before shutdown"
)

for (const wallet of senderManager.getWalletsInUse()) {
console.log("pushing wallet back to pool", wallet.address)
senderManager.pushWallet(wallet)
}

process.exit(0)
}

process.on("SIGINT", gracefulShutdown)
process.on("SIGTERM", gracefulShutdown)
process.once("SIGINT", gracefulShutdown)
process.once("SIGTERM", gracefulShutdown)
}
11 changes: 4 additions & 7 deletions src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import {
isTransactionUnderpricedError
} from "./utils"
import type { SendTransactionErrorType } from "viem"
import type { AltoConfig } from "../createConfig"
import type { AltoConfig } from "@alto/config"

export interface GasEstimateResult {
preverificationGas: bigint
Expand Down Expand Up @@ -137,10 +137,7 @@ export class Executor {
}

markWalletProcessed(executor: Account) {
if (!this.senderManager.availableWallets.includes(executor)) {
this.senderManager.pushWallet(executor)
}
return Promise.resolve()
this.senderManager.pushWallet(executor)
}

async replaceTransaction(
Expand Down Expand Up @@ -482,9 +479,9 @@ export class Executor {
}

async flushStuckTransactions(): Promise<void> {
const allWallets = new Set(this.senderManager.wallets)
const allWallets = new Set(this.senderManager.getAllWallets())

const utilityWallet = this.senderManager.utilityAccount
const utilityWallet = this.config.utilityPrivateKey
if (utilityWallet) {
allWallets.add(utilityWallet)
}
Expand Down
Loading
Loading