From e829d68d116e9c86bb34e720a7201a9180f04cab Mon Sep 17 00:00:00 2001 From: Roman Maslennikov Date: Fri, 17 Jan 2025 16:20:04 +0400 Subject: [PATCH 1/9] feat(ethexe): add dispatch origin --- .gitignore | 1 + ethexe/processor/src/handling/events.rs | 22 ++++++++++++++++++++-- ethexe/processor/src/handling/mod.rs | 8 ++++++-- ethexe/processor/src/handling/run.rs | 8 +++++++- ethexe/processor/src/lib.rs | 11 ++++++++--- ethexe/processor/src/tests.rs | 8 ++++---- ethexe/runtime/common/src/journal.rs | 12 ++++++++---- ethexe/runtime/common/src/lib.rs | 1 + ethexe/runtime/common/src/schedule.rs | 7 ++++++- ethexe/runtime/common/src/state.rs | 19 ++++++++++++++++++- 10 files changed, 79 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 00e4c721460..d6d6408ffa8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ node_modules/ target/ target-no-lazy/ +target-rust-analyzer/ target-xwin/ log/ weight-dumps/ diff --git a/ethexe/processor/src/handling/events.rs b/ethexe/processor/src/handling/events.rs index 5d22ee2fc99..f43e1f78768 100644 --- a/ethexe/processor/src/handling/events.rs +++ b/ethexe/processor/src/handling/events.rs @@ -66,6 +66,8 @@ impl ProcessingHandler { return Ok(()); } + let dispatch_origin = self.dispatch_origin; + match event { MirrorRequestEvent::ExecutableBalanceTopUpRequested { value } => { self.update_state(actor_id, |state, _, _| { @@ -81,7 +83,15 @@ impl ProcessingHandler { self.update_state(actor_id, |state, storage, _| -> Result<()> { let is_init = state.requires_init_message(); - let dispatch = Dispatch::new(storage, id, source, payload, value, is_init)?; + let dispatch = Dispatch::new( + storage, + id, + source, + payload, + value, + is_init, + dispatch_origin, + )?; state .queue_hash @@ -120,7 +130,14 @@ impl ProcessingHandler { &ScheduledTask::RemoveFromMailbox((actor_id, source), replied_to), )?; - let reply = Dispatch::new_reply(storage, replied_to, source, payload, value)?; + let reply = Dispatch::new_reply( + storage, + replied_to, + source, + payload, + value, + dispatch_origin, + )?; state .queue_hash @@ -156,6 +173,7 @@ impl ProcessingHandler { PayloadLookup::empty(), 0, SuccessReplyReason::Auto, + dispatch_origin, ); state diff --git a/ethexe/processor/src/handling/mod.rs b/ethexe/processor/src/handling/mod.rs index 3d08e4805a0..b55cd9115c3 100644 --- a/ethexe/processor/src/handling/mod.rs +++ b/ethexe/processor/src/handling/mod.rs @@ -20,7 +20,8 @@ use crate::Processor; use anyhow::{anyhow, Result}; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_runtime_common::{ - state::ProgramState, InBlockTransitions, ScheduleHandler, TransitionController, + state::{Origin, ProgramState}, + InBlockTransitions, ScheduleHandler, TransitionController, }; use gprimitives::{ActorId, CodeId, H256}; @@ -31,6 +32,7 @@ pub struct ProcessingHandler { pub block_hash: H256, pub db: Database, pub transitions: InBlockTransitions, + pub dispatch_origin: Origin, } impl ProcessingHandler { @@ -51,7 +53,7 @@ impl ProcessingHandler { } impl Processor { - pub fn handler(&self, block_hash: H256) -> Result { + pub fn handler(&self, block_hash: H256, dispatch_origin: Origin) -> Result { let header = self .db .block_header(block_hash) @@ -74,6 +76,7 @@ impl Processor { block_hash, db: self.db.clone(), transitions, + dispatch_origin, }) } @@ -112,6 +115,7 @@ impl ProcessingHandler { ); let mut handler = ScheduleHandler { + dispatch_origin: self.dispatch_origin, controller: self.controller(), }; diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index 0ab3c1fbc13..c4b829bee09 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -22,7 +22,9 @@ use crate::{ }; use core_processor::common::JournalNote; use ethexe_db::{CodesStorage, Database}; -use ethexe_runtime_common::{InBlockTransitions, JournalHandler, TransitionController}; +use ethexe_runtime_common::{ + state::Origin, InBlockTransitions, JournalHandler, TransitionController, +}; use gear_core::ids::ProgramId; use gprimitives::H256; use std::collections::BTreeMap; @@ -41,6 +43,7 @@ pub fn run( db: Database, instance_creator: InstanceCreator, in_block_transitions: &mut InBlockTransitions, + dispatch_origin: Origin, ) { tokio::task::block_in_place(|| { let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); @@ -59,6 +62,7 @@ pub fn run( db, instance_creator, in_block_transitions, + dispatch_origin, ) .await }) @@ -72,6 +76,7 @@ async fn run_in_async( db: Database, instance_creator: InstanceCreator, in_block_transitions: &mut InBlockTransitions, + dispatch_origin: Origin, ) { let mut task_senders = vec![]; let mut handles = vec![]; @@ -112,6 +117,7 @@ async fn run_in_async( transitions: in_block_transitions, storage: &db, }, + dispatch_origin, }; core_processor::handle_journal(journal, &mut handler); } diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index aa010dd5386..6935997c52f 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -21,7 +21,7 @@ use anyhow::{anyhow, ensure, Result}; use ethexe_common::events::{BlockRequestEvent, MirrorRequestEvent}; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; -use ethexe_runtime_common::state::Storage; +use ethexe_runtime_common::state::{Origin, Storage}; use gear_core::{ids::prelude::CodeIdExt, message::ReplyInfo}; use gprimitives::{ActorId, CodeId, MessageId, H256}; use handling::{run, ProcessingHandler}; @@ -106,7 +106,9 @@ impl Processor { ) -> Result> { log::debug!("Processing events for {block_hash:?}: {events:#?}"); - let mut handler = self.handler(block_hash)?; + // ATM only eth origin events + let dispatch_origin = Origin::Ethereum; + let mut handler = self.handler(block_hash, dispatch_origin)?; for event in events { match event { @@ -146,6 +148,7 @@ impl Processor { self.db.clone(), self.creator.clone(), &mut handler.transitions, + handler.dispatch_origin, ); } } @@ -165,7 +168,9 @@ impl OverlaidProcessor { ) -> Result { self.0.creator.set_chain_head(block_hash); - let mut handler = self.0.handler(block_hash)?; + // ATM only eth origin events + let dispatch_origin = Origin::Ethereum; + let mut handler = self.0.handler(block_hash, dispatch_origin)?; let state_hash = handler .transitions diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index 441f26ef4ac..b025a2f18bd 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -250,7 +250,7 @@ fn ping_pong() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let mut handler = processor.handler(ch0).unwrap(); + let mut handler = processor.handler(ch0, Default::default()).unwrap(); handler .handle_router_event(RouterRequestEvent::ProgramCreated { actor_id, code_id }) @@ -333,7 +333,7 @@ fn async_and_ping() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let mut handler = processor.handler(ch0).unwrap(); + let mut handler = processor.handler(ch0, Default::default()).unwrap(); handler .handle_router_event(RouterRequestEvent::ProgramCreated { @@ -469,7 +469,7 @@ fn many_waits() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let mut handler = processor.handler(ch0).unwrap(); + let mut handler = processor.handler(ch0, Default::default()).unwrap(); let amount = 10000; for i in 0..amount { @@ -581,7 +581,7 @@ fn many_waits() { assert_eq!(schedule, expected_schedule); } - let mut handler = processor.handler(ch11).unwrap(); + let mut handler = processor.handler(ch11, Default::default()).unwrap(); handler.run_schedule(); processor.process_queue(&mut handler); diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 447fc14407f..b2ce59c9459 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -1,5 +1,5 @@ use crate::{ - state::{ActiveProgram, Dispatch, Program, Storage, ValueWithExpiry, MAILBOX_VALIDITY}, + state::{ActiveProgram, Dispatch, Origin, Program, Storage, ValueWithExpiry, MAILBOX_VALIDITY}, TransitionController, }; use alloc::{collections::BTreeMap, vec::Vec}; @@ -20,6 +20,7 @@ use gprimitives::{ActorId, CodeId, MessageId, ReservationId}; #[derive(derive_more::Deref, derive_more::DerefMut)] pub struct Handler<'a, S: Storage> { pub program_id: ProgramId, + pub dispatch_origin: Origin, #[deref] #[deref_mut] pub controller: TransitionController<'a, S>, @@ -66,6 +67,8 @@ impl Handler<'_, S> { return; } + let dispatch_origin = self.dispatch_origin; + self.update_state(dispatch.source(), |state, storage, transitions| { if let Ok(non_zero_delay) = delay.try_into() { let expiry = transitions.schedule_task( @@ -77,7 +80,7 @@ impl Handler<'_, S> { ); let user_id = dispatch.destination(); - let dispatch = Dispatch::from_stored(storage, dispatch); + let dispatch = Dispatch::from_stored(storage, dispatch, dispatch_origin); state.stash_hash.modify_stash(storage, |stash| { stash.add_to_user(dispatch.id, dispatch, expiry, user_id); @@ -219,7 +222,7 @@ impl JournalHandler for Handler<'_, S> { let dispatch = dispatch.into_stored(); if self.transitions.is_program(&destination) { - let dispatch = Dispatch::from_stored(self.storage, dispatch); + let dispatch = Dispatch::from_stored(self.storage, dispatch, self.dispatch_origin); self.send_dispatch_to_program(message_id, destination, dispatch, delay); } else { @@ -241,6 +244,7 @@ impl JournalHandler for Handler<'_, S> { NonZero::::try_from(duration).expect("must be checked on backend side"); let program_id = self.program_id; + let dispatch_origin = self.dispatch_origin; self.update_state(program_id, |state, storage, transitions| { let expiry = transitions.schedule_task( @@ -248,7 +252,7 @@ impl JournalHandler for Handler<'_, S> { ScheduledTask::WakeMessage(dispatch.destination(), dispatch.id()), ); - let dispatch = Dispatch::from_stored(storage, dispatch); + let dispatch = Dispatch::from_stored(storage, dispatch, dispatch_origin); state.queue_hash.modify_queue(storage, |queue| { let head = queue diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 1fb2f1f483e..054864c7c5c 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -169,6 +169,7 @@ where value, details, context, + .. } = queue.dequeue().unwrap(); let payload = payload.query(ri.storage()).expect("failed to get payload"); diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index bbf64faeb22..bc460c03ec5 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -1,5 +1,5 @@ use crate::{ - state::{Dispatch, PayloadLookup, Storage, ValueWithExpiry, MAILBOX_VALIDITY}, + state::{Dispatch, Origin, PayloadLookup, Storage, ValueWithExpiry, MAILBOX_VALIDITY}, TransitionController, }; use ethexe_common::{ @@ -12,7 +12,10 @@ use gprimitives::{ActorId, CodeId, MessageId, ReservationId}; #[derive(derive_more::Deref, derive_more::DerefMut)] pub struct Handler<'a, S: Storage> { + #[deref_mut] + #[deref] pub controller: TransitionController<'a, S>, + pub dispatch_origin: Origin, } impl TaskHandler for Handler<'_, S> { @@ -21,6 +24,7 @@ impl TaskHandler for Handler<'_, S> { (program_id, user_id): (ProgramId, ActorId), message_id: MessageId, ) -> u64 { + let dispatch_origin = self.dispatch_origin; self.update_state(program_id, |state, storage, transitions| { let ValueWithExpiry { value, .. } = state.mailbox_hash.modify_mailbox(storage, |mailbox| { @@ -43,6 +47,7 @@ impl TaskHandler for Handler<'_, S> { PayloadLookup::empty(), 0, SuccessReplyReason::Auto, + dispatch_origin, ); state diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 3b6cf3036fa..6b82316a2ea 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -502,6 +502,14 @@ impl ProgramState { } } +#[derive(Clone, Copy, Debug, Encode, Decode, PartialEq, Eq, Default)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] +pub enum Origin { + #[default] + Ethereum, + OffChain, +} + #[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] #[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub struct Dispatch { @@ -519,6 +527,8 @@ pub struct Dispatch { pub details: Option, /// Message previous executions context. pub context: Option, + /// Origin of the message. + pub origin: Origin, } impl Dispatch { @@ -529,6 +539,7 @@ impl Dispatch { payload: Vec, value: u128, is_init: bool, + origin: Origin, ) -> Result { let payload = storage.write_payload_raw(payload)?; @@ -546,6 +557,7 @@ impl Dispatch { value, details: None, context: None, + origin, }) } @@ -555,6 +567,7 @@ impl Dispatch { source: ActorId, payload: Vec, value: u128, + origin: Origin, ) -> Result { let payload_hash = storage.write_payload_raw(payload)?; @@ -564,6 +577,7 @@ impl Dispatch { payload_hash, value, SuccessReplyReason::Manual, + origin, )) } @@ -573,6 +587,7 @@ impl Dispatch { payload: PayloadLookup, value: u128, reply_code: impl Into, + origin: Origin, ) -> Self { Self { id: MessageId::generate_reply(reply_to), @@ -582,10 +597,11 @@ impl Dispatch { value, details: Some(ReplyDetails::new(reply_to, reply_code.into()).into()), context: None, + origin, } } - pub fn from_stored(storage: &S, value: StoredDispatch) -> Self { + pub fn from_stored(storage: &S, value: StoredDispatch, origin: Origin) -> Self { let (kind, message, context) = value.into_parts(); let (id, source, _destination, payload, value, details) = message.into_parts(); @@ -601,6 +617,7 @@ impl Dispatch { value, details, context, + origin, } } From 4934ec6c4c53eeb94b50e7eb7bfaed5eb87f753c Mon Sep 17 00:00:00 2001 From: Roman Maslennikov Date: Fri, 17 Jan 2025 16:21:47 +0400 Subject: [PATCH 2/9] Trigger CI From 664be14ae32d66f3b0a97269f062ac4e744235bf Mon Sep 17 00:00:00 2001 From: Roman Maslennikov Date: Sat, 18 Jan 2025 15:17:39 +0400 Subject: [PATCH 3/9] Review fixes --- Cargo.lock | 1 + ethexe/common/Cargo.toml | 4 +++ ethexe/common/src/db.rs | 9 ++++-- ethexe/common/src/gear.rs | 8 +++++ ethexe/common/src/lib.rs | 2 +- ethexe/processor/src/handling/events.rs | 20 ++++++++----- ethexe/processor/src/handling/mod.rs | 8 ++--- ethexe/processor/src/handling/run.rs | 40 ++++++++++++++++--------- ethexe/processor/src/lib.rs | 11 ++----- ethexe/processor/src/tests.rs | 8 ++--- ethexe/runtime/Cargo.toml | 2 ++ ethexe/runtime/common/src/journal.rs | 8 ++--- ethexe/runtime/common/src/lib.rs | 18 +++++++++++ ethexe/runtime/common/src/schedule.rs | 19 +++++++----- ethexe/runtime/common/src/state.rs | 14 ++++----- ethexe/service/src/tests.rs | 10 +++++-- 16 files changed, 116 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f29fabb853d..91c4cb3da77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4711,6 +4711,7 @@ dependencies = [ name = "ethexe-runtime" version = "1.7.0" dependencies = [ + "ethexe-common", "ethexe-runtime-common", "gear-core", "gear-core-processor", diff --git a/ethexe/common/Cargo.toml b/ethexe/common/Cargo.toml index 9e0f167594c..7fed1e8c8a8 100644 --- a/ethexe/common/Cargo.toml +++ b/ethexe/common/Cargo.toml @@ -18,3 +18,7 @@ derive_more.workspace = true hex.workspace = true anyhow.workspace = true serde.workspace = true + +[features] +default = [] +std = [] diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index 2d3a0fcb153..f0d7d7b9988 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -18,7 +18,10 @@ //! ethexe common db types and traits. -use crate::{events::BlockRequestEvent, gear::StateTransition}; +use crate::{ + events::BlockRequestEvent, + gear::{Origin, StateTransition}, +}; use alloc::{ collections::{BTreeMap, BTreeSet, VecDeque}, vec::Vec, @@ -31,13 +34,13 @@ use gprimitives::{MessageId, H256}; use parity_scale_codec::{Decode, Encode}; /// RemoveFromMailbox key; (msgs sources program (mailbox and queue provider), destination user id) -pub type Rfm = (ProgramId, ActorId); +pub type Rfm = (ProgramId, ActorId, Origin); /// SendDispatch key; (msgs destinations program (stash and queue provider), message id) pub type Sd = (ProgramId, MessageId); /// SendUserMessage key; (msgs sources program (mailbox and stash provider)) -pub type Sum = ProgramId; +pub type Sum = (ProgramId, Origin); /// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe. pub type ScheduledTask = gear_core::tasks::ScheduledTask; diff --git a/ethexe/common/src/gear.rs b/ethexe/common/src/gear.rs index cdaa36710bc..96598a52a29 100644 --- a/ethexe/common/src/gear.rs +++ b/ethexe/common/src/gear.rs @@ -131,3 +131,11 @@ pub struct ValueClaim { pub destination: ActorId, pub value: u128, } + +#[derive(Clone, Copy, Debug, Encode, Decode, PartialEq, Eq, Default, PartialOrd, Ord)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] +pub enum Origin { + #[default] + Ethereum, + OffChain, +} diff --git a/ethexe/common/src/lib.rs b/ethexe/common/src/lib.rs index c70780b10cf..439174cb7f9 100644 --- a/ethexe/common/src/lib.rs +++ b/ethexe/common/src/lib.rs @@ -18,7 +18,7 @@ //! ethexe common types and traits. -#![no_std] +#![cfg_attr(not(feature = "std"), no_std)] extern crate alloc; diff --git a/ethexe/processor/src/handling/events.rs b/ethexe/processor/src/handling/events.rs index f43e1f78768..e449e476ee0 100644 --- a/ethexe/processor/src/handling/events.rs +++ b/ethexe/processor/src/handling/events.rs @@ -20,7 +20,7 @@ use super::ProcessingHandler; use anyhow::{ensure, Result}; use ethexe_common::{ events::{MirrorRequestEvent, RouterRequestEvent, WVaraRequestEvent}, - gear::ValueClaim, + gear::{Origin, ValueClaim}, }; use ethexe_db::{CodesStorage, ScheduledTask}; use ethexe_runtime_common::state::{Dispatch, PayloadLookup, ValueWithExpiry}; @@ -66,8 +66,6 @@ impl ProcessingHandler { return Ok(()); } - let dispatch_origin = self.dispatch_origin; - match event { MirrorRequestEvent::ExecutableBalanceTopUpRequested { value } => { self.update_state(actor_id, |state, _, _| { @@ -90,7 +88,7 @@ impl ProcessingHandler { payload, value, is_init, - dispatch_origin, + Origin::Ethereum, )?; state @@ -127,7 +125,10 @@ impl ProcessingHandler { transitions.remove_task( expiry, - &ScheduledTask::RemoveFromMailbox((actor_id, source), replied_to), + &ScheduledTask::RemoveFromMailbox( + (actor_id, source, Origin::Ethereum), + replied_to, + ), )?; let reply = Dispatch::new_reply( @@ -136,7 +137,7 @@ impl ProcessingHandler { source, payload, value, - dispatch_origin, + Origin::Ethereum, )?; state @@ -164,7 +165,10 @@ impl ProcessingHandler { transitions.remove_task( expiry, - &ScheduledTask::RemoveFromMailbox((actor_id, source), claimed_id), + &ScheduledTask::RemoveFromMailbox( + (actor_id, source, Origin::Ethereum), + claimed_id, + ), )?; let reply = Dispatch::reply( @@ -173,7 +177,7 @@ impl ProcessingHandler { PayloadLookup::empty(), 0, SuccessReplyReason::Auto, - dispatch_origin, + Origin::Ethereum, ); state diff --git a/ethexe/processor/src/handling/mod.rs b/ethexe/processor/src/handling/mod.rs index b55cd9115c3..3d08e4805a0 100644 --- a/ethexe/processor/src/handling/mod.rs +++ b/ethexe/processor/src/handling/mod.rs @@ -20,8 +20,7 @@ use crate::Processor; use anyhow::{anyhow, Result}; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_runtime_common::{ - state::{Origin, ProgramState}, - InBlockTransitions, ScheduleHandler, TransitionController, + state::ProgramState, InBlockTransitions, ScheduleHandler, TransitionController, }; use gprimitives::{ActorId, CodeId, H256}; @@ -32,7 +31,6 @@ pub struct ProcessingHandler { pub block_hash: H256, pub db: Database, pub transitions: InBlockTransitions, - pub dispatch_origin: Origin, } impl ProcessingHandler { @@ -53,7 +51,7 @@ impl ProcessingHandler { } impl Processor { - pub fn handler(&self, block_hash: H256, dispatch_origin: Origin) -> Result { + pub fn handler(&self, block_hash: H256) -> Result { let header = self .db .block_header(block_hash) @@ -76,7 +74,6 @@ impl Processor { block_hash, db: self.db.clone(), transitions, - dispatch_origin, }) } @@ -115,7 +112,6 @@ impl ProcessingHandler { ); let mut handler = ScheduleHandler { - dispatch_origin: self.dispatch_origin, controller: self.controller(), }; diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index c4b829bee09..ecaa11cee43 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -22,9 +22,7 @@ use crate::{ }; use core_processor::common::JournalNote; use ethexe_db::{CodesStorage, Database}; -use ethexe_runtime_common::{ - state::Origin, InBlockTransitions, JournalHandler, TransitionController, -}; +use ethexe_runtime_common::{InBlockTransitions, JournalHandler, TransitionController}; use gear_core::ids::ProgramId; use gprimitives::H256; use std::collections::BTreeMap; @@ -43,7 +41,6 @@ pub fn run( db: Database, instance_creator: InstanceCreator, in_block_transitions: &mut InBlockTransitions, - dispatch_origin: Origin, ) { tokio::task::block_in_place(|| { let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); @@ -62,7 +59,6 @@ pub fn run( db, instance_creator, in_block_transitions, - dispatch_origin, ) .await }) @@ -76,7 +72,6 @@ async fn run_in_async( db: Database, instance_creator: InstanceCreator, in_block_transitions: &mut InBlockTransitions, - dispatch_origin: Origin, ) { let mut task_senders = vec![]; let mut handles = vec![]; @@ -111,15 +106,32 @@ async fn run_in_async( } for (program_id, journal) in super_journal { - let mut handler = JournalHandler { - program_id, - controller: TransitionController { - transitions: in_block_transitions, - storage: &db, - }, - dispatch_origin, + let controller = TransitionController { + transitions: in_block_transitions, + storage: &db, }; - core_processor::handle_journal(journal, &mut handler); + + let dispatch_origin = controller.access_state(program_id, |state, storage, _| { + let queue = state + .queue_hash + .query(storage) + .expect("failed to query queue"); + + queue.peek().map(|message| message.origin) + }); + + // Absence of `dispatch_origin` means journal is empty, so we can skip journal handling + if let Some(dispatch_origin) = dispatch_origin { + let mut handler = JournalHandler { + program_id, + controller: TransitionController { + transitions: in_block_transitions, + storage: &db, + }, + dispatch_origin, + }; + core_processor::handle_journal(journal, &mut handler); + } } } diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 6935997c52f..aa010dd5386 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -21,7 +21,7 @@ use anyhow::{anyhow, ensure, Result}; use ethexe_common::events::{BlockRequestEvent, MirrorRequestEvent}; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; -use ethexe_runtime_common::state::{Origin, Storage}; +use ethexe_runtime_common::state::Storage; use gear_core::{ids::prelude::CodeIdExt, message::ReplyInfo}; use gprimitives::{ActorId, CodeId, MessageId, H256}; use handling::{run, ProcessingHandler}; @@ -106,9 +106,7 @@ impl Processor { ) -> Result> { log::debug!("Processing events for {block_hash:?}: {events:#?}"); - // ATM only eth origin events - let dispatch_origin = Origin::Ethereum; - let mut handler = self.handler(block_hash, dispatch_origin)?; + let mut handler = self.handler(block_hash)?; for event in events { match event { @@ -148,7 +146,6 @@ impl Processor { self.db.clone(), self.creator.clone(), &mut handler.transitions, - handler.dispatch_origin, ); } } @@ -168,9 +165,7 @@ impl OverlaidProcessor { ) -> Result { self.0.creator.set_chain_head(block_hash); - // ATM only eth origin events - let dispatch_origin = Origin::Ethereum; - let mut handler = self.0.handler(block_hash, dispatch_origin)?; + let mut handler = self.0.handler(block_hash)?; let state_hash = handler .transitions diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index b025a2f18bd..441f26ef4ac 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -250,7 +250,7 @@ fn ping_pong() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let mut handler = processor.handler(ch0, Default::default()).unwrap(); + let mut handler = processor.handler(ch0).unwrap(); handler .handle_router_event(RouterRequestEvent::ProgramCreated { actor_id, code_id }) @@ -333,7 +333,7 @@ fn async_and_ping() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let mut handler = processor.handler(ch0, Default::default()).unwrap(); + let mut handler = processor.handler(ch0).unwrap(); handler .handle_router_event(RouterRequestEvent::ProgramCreated { @@ -469,7 +469,7 @@ fn many_waits() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let mut handler = processor.handler(ch0, Default::default()).unwrap(); + let mut handler = processor.handler(ch0).unwrap(); let amount = 10000; for i in 0..amount { @@ -581,7 +581,7 @@ fn many_waits() { assert_eq!(schedule, expected_schedule); } - let mut handler = processor.handler(ch11, Default::default()).unwrap(); + let mut handler = processor.handler(ch11).unwrap(); handler.run_schedule(); processor.process_queue(&mut handler); diff --git a/ethexe/runtime/Cargo.toml b/ethexe/runtime/Cargo.toml index ef604b591b3..fbe7b948a1f 100644 --- a/ethexe/runtime/Cargo.toml +++ b/ethexe/runtime/Cargo.toml @@ -9,6 +9,7 @@ repository.workspace = true [dependencies] ## Gear deps. +ethexe-common.workspace = true ethexe-runtime-common.workspace = true core-processor.workspace = true gear-core.workspace = true @@ -33,4 +34,5 @@ std = [ "gear-core/std", "gear-lazy-pages-interface/std", "gear-wasm-instrument/std", + "ethexe-common/std" ] diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index b2ce59c9459..b0eea802387 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -1,12 +1,12 @@ use crate::{ - state::{ActiveProgram, Dispatch, Origin, Program, Storage, ValueWithExpiry, MAILBOX_VALIDITY}, + state::{ActiveProgram, Dispatch, Program, Storage, ValueWithExpiry, MAILBOX_VALIDITY}, TransitionController, }; use alloc::{collections::BTreeMap, vec::Vec}; use anyhow::bail; use core::{mem, num::NonZero}; use core_processor::common::{DispatchOutcome, JournalHandler}; -use ethexe_common::db::ScheduledTask; +use ethexe_common::{db::ScheduledTask, gear::Origin}; use gear_core::{ ids::ProgramId, memory::PageBuf, @@ -75,7 +75,7 @@ impl Handler<'_, S> { non_zero_delay, ScheduledTask::SendUserMessage { message_id: dispatch.id(), - to_mailbox: dispatch.source(), + to_mailbox: (dispatch.source(), dispatch_origin), }, ); @@ -89,7 +89,7 @@ impl Handler<'_, S> { let expiry = transitions.schedule_task( MAILBOX_VALIDITY.try_into().expect("infallible"), ScheduledTask::RemoveFromMailbox( - (dispatch.source(), dispatch.destination()), + (dispatch.source(), dispatch.destination(), dispatch_origin), dispatch.id(), ), ); diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 054864c7c5c..871f5a8533e 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -91,6 +91,24 @@ impl TransitionController<'_, S> { res } + + pub fn access_state( + &self, + program_id: ProgramId, + f: impl FnOnce(&ProgramState, &S, &InBlockTransitions) -> T, + ) -> T { + let state_hash = self + .transitions + .state_of(&program_id) + .expect("failed to find program in known states"); + + let state = self + .storage + .read_state(state_hash) + .expect("failed to read state from storage"); + + f(&state, self.storage, self.transitions) + } } pub fn process_next_message( diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index bc460c03ec5..574905a4570 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -1,10 +1,10 @@ use crate::{ - state::{Dispatch, Origin, PayloadLookup, Storage, ValueWithExpiry, MAILBOX_VALIDITY}, + state::{Dispatch, PayloadLookup, Storage, ValueWithExpiry, MAILBOX_VALIDITY}, TransitionController, }; use ethexe_common::{ db::{Rfm, ScheduledTask, Sd, Sum}, - gear::ValueClaim, + gear::{Origin, ValueClaim}, }; use gear_core::{ids::ProgramId, tasks::TaskHandler}; use gear_core_errors::SuccessReplyReason; @@ -15,16 +15,14 @@ pub struct Handler<'a, S: Storage> { #[deref_mut] #[deref] pub controller: TransitionController<'a, S>, - pub dispatch_origin: Origin, } impl TaskHandler for Handler<'_, S> { fn remove_from_mailbox( &mut self, - (program_id, user_id): (ProgramId, ActorId), + (program_id, user_id, dispatch_origin): (ProgramId, ActorId, Origin), message_id: MessageId, ) -> u64 { - let dispatch_origin = self.dispatch_origin; self.update_state(program_id, |state, storage, transitions| { let ValueWithExpiry { value, .. } = state.mailbox_hash.modify_mailbox(storage, |mailbox| { @@ -72,7 +70,11 @@ impl TaskHandler for Handler<'_, S> { 0 } - fn send_user_message(&mut self, stashed_message_id: MessageId, program_id: ProgramId) -> u64 { + fn send_user_message( + &mut self, + stashed_message_id: MessageId, + (program_id, dispatch_origin): (ProgramId, Origin), + ) -> u64 { self.update_state(program_id, |state, storage, transitions| { let (dispatch, user_id) = state .stash_hash @@ -80,7 +82,10 @@ impl TaskHandler for Handler<'_, S> { let expiry = transitions.schedule_task( MAILBOX_VALIDITY.try_into().expect("infallible"), - ScheduledTask::RemoveFromMailbox((program_id, user_id), stashed_message_id), + ScheduledTask::RemoveFromMailbox( + (program_id, user_id, dispatch_origin), + stashed_message_id, + ), ); state.mailbox_hash.modify_mailbox(storage, |mailbox| { diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 6b82316a2ea..11f22f30a0e 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -31,7 +31,7 @@ use core::{ mem, ops::{Index, IndexMut}, }; -use ethexe_common::gear::Message; +use ethexe_common::gear::{Message, Origin}; pub use gear_core::program::ProgramState as InitStatus; use gear_core::{ ids::{prelude::MessageIdExt as _, ProgramId}, @@ -502,14 +502,6 @@ impl ProgramState { } } -#[derive(Clone, Copy, Debug, Encode, Decode, PartialEq, Eq, Default)] -#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] -pub enum Origin { - #[default] - Ethereum, - OffChain, -} - #[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] #[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub struct Dispatch { @@ -672,6 +664,10 @@ impl MessageQueue { self.0.pop_front() } + pub fn peek(&self) -> Option<&Dispatch> { + self.0.front() + } + pub fn store(self, storage: &S) -> MaybeHashOf { MaybeHashOf((!self.0.is_empty()).then(|| storage.write_queue(self))) } diff --git a/ethexe/service/src/tests.rs b/ethexe/service/src/tests.rs index de13c95e0d2..8e945a28de8 100644 --- a/ethexe/service/src/tests.rs +++ b/ethexe/service/src/tests.rs @@ -440,8 +440,14 @@ async fn mailbox() { ( expiry, BTreeSet::from_iter([ - ScheduledTask::RemoveFromMailbox((pid, env.sender_id), mid_expected_message), - ScheduledTask::RemoveFromMailbox((pid, env.sender_id), ping_expected_message), + ScheduledTask::RemoveFromMailbox( + (pid, env.sender_id, Default::default()), + mid_expected_message, + ), + ScheduledTask::RemoveFromMailbox( + (pid, env.sender_id, Default::default()), + ping_expected_message, + ), ]), ), ]); From 1fa5acb6a8421de23a15768c2ca4c4aae1bad6a0 Mon Sep 17 00:00:00 2001 From: Roman Maslennikov Date: Thu, 13 Feb 2025 19:24:56 +0400 Subject: [PATCH 4/9] Review fixes --- ethexe/common/src/db.rs | 4 +- ethexe/runtime/common/src/journal.rs | 8 +- ethexe/runtime/common/src/schedule.rs | 147 +++++++++++++------------- ethexe/runtime/common/src/state.rs | 6 +- 4 files changed, 83 insertions(+), 82 deletions(-) diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index f0d7d7b9988..52764187195 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -33,14 +33,14 @@ use gear_core::{ use gprimitives::{MessageId, H256}; use parity_scale_codec::{Decode, Encode}; -/// RemoveFromMailbox key; (msgs sources program (mailbox and queue provider), destination user id) +/// RemoveFromMailbox key; (msgs sources program (mailbox and queue provider), destination user id, message origin) pub type Rfm = (ProgramId, ActorId, Origin); /// SendDispatch key; (msgs destinations program (stash and queue provider), message id) pub type Sd = (ProgramId, MessageId); /// SendUserMessage key; (msgs sources program (mailbox and stash provider)) -pub type Sum = (ProgramId, Origin); +pub type Sum = ProgramId; /// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe. pub type ScheduledTask = gear_core::tasks::ScheduledTask; diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index b0eea802387..d5f745c6fcf 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -75,12 +75,12 @@ impl Handler<'_, S> { non_zero_delay, ScheduledTask::SendUserMessage { message_id: dispatch.id(), - to_mailbox: (dispatch.source(), dispatch_origin), + to_mailbox: dispatch.source(), }, ); let user_id = dispatch.destination(); - let dispatch = Dispatch::from_stored(storage, dispatch, dispatch_origin); + let dispatch = Dispatch::from_core_stored(storage, dispatch, dispatch_origin); state.stash_hash.modify_stash(storage, |stash| { stash.add_to_user(dispatch.id, dispatch, expiry, user_id); @@ -222,7 +222,7 @@ impl JournalHandler for Handler<'_, S> { let dispatch = dispatch.into_stored(); if self.transitions.is_program(&destination) { - let dispatch = Dispatch::from_stored(self.storage, dispatch, self.dispatch_origin); + let dispatch = Dispatch::from_core_stored(self.storage, dispatch, self.dispatch_origin); self.send_dispatch_to_program(message_id, destination, dispatch, delay); } else { @@ -252,7 +252,7 @@ impl JournalHandler for Handler<'_, S> { ScheduledTask::WakeMessage(dispatch.destination(), dispatch.id()), ); - let dispatch = Dispatch::from_stored(storage, dispatch, dispatch_origin); + let dispatch = Dispatch::from_core_stored(storage, dispatch, dispatch_origin); state.queue_hash.modify_queue(storage, |queue| { let head = queue diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index 574905a4570..1fbbe8d3fe1 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -10,10 +10,7 @@ use gear_core::{ids::ProgramId, tasks::TaskHandler}; use gear_core_errors::SuccessReplyReason; use gprimitives::{ActorId, CodeId, MessageId, ReservationId}; -#[derive(derive_more::Deref, derive_more::DerefMut)] pub struct Handler<'a, S: Storage> { - #[deref_mut] - #[deref] pub controller: TransitionController<'a, S>, } @@ -23,81 +20,80 @@ impl TaskHandler for Handler<'_, S> { (program_id, user_id, dispatch_origin): (ProgramId, ActorId, Origin), message_id: MessageId, ) -> u64 { - self.update_state(program_id, |state, storage, transitions| { - let ValueWithExpiry { value, .. } = - state.mailbox_hash.modify_mailbox(storage, |mailbox| { - mailbox - .remove(user_id, message_id) - .expect("failed to find message in mailbox") + self.controller + .update_state(program_id, |state, storage, transitions| { + let ValueWithExpiry { value, .. } = + state.mailbox_hash.modify_mailbox(storage, |mailbox| { + mailbox + .remove(user_id, message_id) + .expect("failed to find message in mailbox") + }); + + transitions.modify_transition(program_id, |transition| { + transition.claims.push(ValueClaim { + message_id, + destination: user_id, + value, + }) }); - transitions.modify_transition(program_id, |transition| { - transition.claims.push(ValueClaim { + let reply = Dispatch::reply( message_id, - destination: user_id, - value, - }) + user_id, + PayloadLookup::empty(), + 0, + SuccessReplyReason::Auto, + dispatch_origin, + ); + + state + .queue_hash + .modify_queue(storage, |queue| queue.queue(reply)); }); - let reply = Dispatch::reply( - message_id, - user_id, - PayloadLookup::empty(), - 0, - SuccessReplyReason::Auto, - dispatch_origin, - ); - - state - .queue_hash - .modify_queue(storage, |queue| queue.queue(reply)); - }); - 0 } fn send_dispatch(&mut self, (program_id, message_id): (ProgramId, MessageId)) -> u64 { - self.update_state(program_id, |state, storage, _| { - state.queue_hash.modify_queue(storage, |queue| { - let dispatch = state - .stash_hash - .modify_stash(storage, |stash| stash.remove_to_program(&message_id)); - - queue.queue(dispatch); + self.controller + .update_state(program_id, |state, storage, _| { + state.queue_hash.modify_queue(storage, |queue| { + let dispatch = state + .stash_hash + .modify_stash(storage, |stash| stash.remove_to_program(&message_id)); + + queue.queue(dispatch); + }); }); - }); 0 } - fn send_user_message( - &mut self, - stashed_message_id: MessageId, - (program_id, dispatch_origin): (ProgramId, Origin), - ) -> u64 { - self.update_state(program_id, |state, storage, transitions| { - let (dispatch, user_id) = state - .stash_hash - .modify_stash(storage, |stash| stash.remove_to_user(&stashed_message_id)); - - let expiry = transitions.schedule_task( - MAILBOX_VALIDITY.try_into().expect("infallible"), - ScheduledTask::RemoveFromMailbox( - (program_id, user_id, dispatch_origin), - stashed_message_id, - ), - ); - - state.mailbox_hash.modify_mailbox(storage, |mailbox| { - mailbox.add(user_id, stashed_message_id, dispatch.value, expiry); - }); + fn send_user_message(&mut self, stashed_message_id: MessageId, program_id: ProgramId) -> u64 { + self.controller + .update_state(program_id, |state, storage, transitions| { + let (dispatch, user_id) = state + .stash_hash + .modify_stash(storage, |stash| stash.remove_to_user(&stashed_message_id)); - transitions.modify_transition(program_id, |transition| { - transition - .messages - .push(dispatch.into_message(storage, user_id)) - }) - }); + let expiry = transitions.schedule_task( + MAILBOX_VALIDITY.try_into().expect("infallible"), + ScheduledTask::RemoveFromMailbox( + (program_id, user_id, dispatch.origin), + stashed_message_id, + ), + ); + + state.mailbox_hash.modify_mailbox(storage, |mailbox| { + mailbox.add(user_id, stashed_message_id, dispatch.value, expiry); + }); + + transitions.modify_transition(program_id, |transition| { + transition + .messages + .push(dispatch.into_message(storage, user_id)) + }) + }); 0 } @@ -106,19 +102,20 @@ impl TaskHandler for Handler<'_, S> { fn wake_message(&mut self, program_id: ProgramId, message_id: MessageId) -> u64 { log::trace!("Running scheduled task wake message {message_id} to {program_id}"); - self.update_state(program_id, |state, storage, _| { - let ValueWithExpiry { - value: dispatch, .. - } = state.waitlist_hash.modify_waitlist(storage, |waitlist| { - waitlist - .wake(&message_id) - .expect("failed to find message in waitlist") - }); + self.controller + .update_state(program_id, |state, storage, _| { + let ValueWithExpiry { + value: dispatch, .. + } = state.waitlist_hash.modify_waitlist(storage, |waitlist| { + waitlist + .wake(&message_id) + .expect("failed to find message in waitlist") + }); - state.queue_hash.modify_queue(storage, |queue| { - queue.queue(dispatch); - }) - }); + state.queue_hash.modify_queue(storage, |queue| { + queue.queue(dispatch); + }) + }); 0 } diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 261cc81150a..96797ed644a 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -593,7 +593,11 @@ impl Dispatch { } } - pub fn from_stored(storage: &S, value: StoredDispatch, origin: Origin) -> Self { + pub fn from_core_stored( + storage: &S, + value: StoredDispatch, + origin: Origin, + ) -> Self { let (kind, message, context) = value.into_parts(); let (id, source, _destination, payload, value, details) = message.into_parts(); From a6660e839fb7864f404fd640fb7045cb0ce4e3ed Mon Sep 17 00:00:00 2001 From: Roman Maslennikov Date: Thu, 13 Feb 2025 20:09:00 +0400 Subject: [PATCH 5/9] Review fixes II --- ethexe/processor/src/handling/run.rs | 48 +++++++++--------------- ethexe/processor/src/host/mod.rs | 7 ++-- ethexe/runtime/common/src/lib.rs | 55 ++++++++++++++++++++++------ ethexe/runtime/src/wasm/api/mod.rs | 6 +-- ethexe/runtime/src/wasm/api/run.rs | 12 ++++-- 5 files changed, 76 insertions(+), 52 deletions(-) diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index ecaa11cee43..f06c2ed1cfe 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -21,6 +21,7 @@ use crate::{ ProcessorConfig, }; use core_processor::common::JournalNote; +use ethexe_common::gear::Origin; use ethexe_db::{CodesStorage, Database}; use ethexe_runtime_common::{InBlockTransitions, JournalHandler, TransitionController}; use gear_core::ids::ProgramId; @@ -32,7 +33,7 @@ enum Task { Run { program_id: ProgramId, state_hash: H256, - result_sender: oneshot::Sender>, + result_sender: oneshot::Sender<(Vec, Option)>, }, } @@ -98,40 +99,27 @@ async fn run_in_async( let mut super_journal = vec![]; for (program_id, receiver) in result_receivers.into_iter() { - let journal = receiver.await.unwrap(); + let (journal, dispatch_origin) = receiver.await.unwrap(); if !journal.is_empty() { + super_journal.push(( + program_id, + dispatch_origin.expect("origin should be set for non-empty journal"), + journal, + )); no_more_to_do = false; } - super_journal.push((program_id, journal)); } - for (program_id, journal) in super_journal { - let controller = TransitionController { - transitions: in_block_transitions, - storage: &db, + for (program_id, dispatch_origin, journal) in super_journal { + let mut handler = JournalHandler { + program_id, + controller: TransitionController { + transitions: in_block_transitions, + storage: &db, + }, + dispatch_origin, }; - - let dispatch_origin = controller.access_state(program_id, |state, storage, _| { - let queue = state - .queue_hash - .query(storage) - .expect("failed to query queue"); - - queue.peek().map(|message| message.origin) - }); - - // Absence of `dispatch_origin` means journal is empty, so we can skip journal handling - if let Some(dispatch_origin) = dispatch_origin { - let mut handler = JournalHandler { - program_id, - controller: TransitionController { - transitions: in_block_transitions, - storage: &db, - }, - dispatch_origin, - }; - core_processor::handle_journal(journal, &mut handler); - } + core_processor::handle_journal(journal, &mut handler); } } @@ -186,7 +174,7 @@ async fn one_batch( from_index: usize, task_senders: &[mpsc::Sender], in_block_transitions: &mut InBlockTransitions, -) -> BTreeMap>> { +) -> BTreeMap, Option)>> { let mut result_receivers = BTreeMap::new(); for (sender, (program_id, state_hash)) in task_senders diff --git a/ethexe/processor/src/host/mod.rs b/ethexe/processor/src/host/mod.rs index 7b6bcfbd0b1..b75fb94e073 100644 --- a/ethexe/processor/src/host/mod.rs +++ b/ethexe/processor/src/host/mod.rs @@ -18,6 +18,7 @@ use anyhow::{anyhow, Result}; use core_processor::common::JournalNote; +use ethexe_common::gear::Origin; use gear_core::{code::InstrumentedCode, ids::ProgramId}; use gprimitives::{CodeId, H256}; use parity_scale_codec::{Decode, Encode}; @@ -130,7 +131,7 @@ impl InstanceWrapper { original_code_id: CodeId, state_hash: H256, maybe_instrumented_code: Option, - ) -> Result> { + ) -> Result<(Vec, Option)> { let chain_head = self.chain_head.expect("chain head must be set before run"); threads::set(db, chain_head, state_hash); @@ -142,7 +143,7 @@ impl InstanceWrapper { ); // Pieces of resulting journal. Hack to avoid single allocation limit. - let ptr_lens: Vec = self.call("run", arg.encode())?; + let (ptr_lens, origin): (Vec, Option) = self.call("run", arg.encode())?; let mut journal = Vec::new(); @@ -151,7 +152,7 @@ impl InstanceWrapper { journal.extend(journal_chunk); } - Ok(journal) + Ok((journal, origin)) } fn call(&mut self, name: &'static str, input: impl AsRef<[u8]>) -> Result { diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 871f5a8533e..9e4a1d65478 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -28,6 +28,7 @@ use core_processor::{ configs::{BlockConfig, SyscallName}, ContextChargedForCode, ContextChargedForInstrumentation, Ext, ProcessExecutionContext, }; +use ethexe_common::gear::Origin; use gear_core::{ code::{InstrumentedCode, MAX_WASM_PAGES_AMOUNT}, ids::ProgramId, @@ -117,7 +118,7 @@ pub fn process_next_message( instrumented_code: Option, code_id: CodeId, ri: &RI, -) -> Vec +) -> (Vec, Option) where S: Storage, RI: RuntimeInterface, @@ -134,7 +135,7 @@ where }); if queue.is_empty() { - return Vec::new(); + return (Vec::new(), None); } // TODO: must be set by some runtime configuration @@ -179,6 +180,36 @@ where reserve_for: 0, }; + let dispatch = queue.dequeue().unwrap(); + let origin = dispatch.origin; + + let journal = process_dispatch( + dispatch, + &block_config, + program_id, + program_state, + instrumented_code, + code_id, + ri, + ); + + (journal, origin.into()) +} + +fn process_dispatch( + dispatch: Dispatch, + block_config: &BlockConfig, + program_id: ProgramId, + program_state: ProgramState, + instrumented_code: Option, + code_id: CodeId, + ri: &RI, +) -> Vec +where + S: Storage, + RI: RuntimeInterface, + >::LazyPages: Send, +{ let Dispatch { id: dispatch_id, kind, @@ -188,7 +219,7 @@ where details, context, .. - } = queue.dequeue().unwrap(); + } = dispatch; let payload = payload.query(ri.storage()).expect("failed to get payload"); @@ -203,7 +234,7 @@ where let dispatch = IncomingDispatch::new(kind, incoming_message, context); let context = match core_processor::precharge_for_program( - &block_config, + block_config, 1_000_000_000_000, dispatch, program_id, @@ -244,7 +275,7 @@ where }); let context = match core_processor::precharge_for_allocations( - &block_config, + block_config, context, allocations.tree_len(), ) { @@ -263,16 +294,16 @@ where memory_infix: active_state.memory_infix, }; - let context = - match core_processor::precharge_for_code_length(&block_config, context, actor_data) { - Ok(context) => context, - Err(journal) => return journal, - }; + let context = match core_processor::precharge_for_code_length(block_config, context, actor_data) + { + Ok(context) => context, + Err(journal) => return journal, + }; let context = ContextChargedForCode::from(context); let context = ContextChargedForInstrumentation::from(context); let context = match core_processor::precharge_for_module_instantiation( - &block_config, + block_config, context, code.instantiated_section_sizes(), ) { @@ -286,6 +317,6 @@ where ri.init_lazy_pages(); - core_processor::process::>(&block_config, execution_context, random_data) + core_processor::process::>(block_config, execution_context, random_data) .unwrap_or_else(|err| unreachable!("{err}")) } diff --git a/ethexe/runtime/src/wasm/api/mod.rs b/ethexe/runtime/src/wasm/api/mod.rs index 174b2df83f5..99dd9abc5f0 100644 --- a/ethexe/runtime/src/wasm/api/mod.rs +++ b/ethexe/runtime/src/wasm/api/mod.rs @@ -46,7 +46,7 @@ fn _run(arg_ptr: i32, arg_len: i32) -> i64 { let (program_id, original_code_id, state_root, maybe_instrumented_code) = Decode::decode(&mut get_slice(arg_ptr, arg_len)).unwrap(); - let journal = run::run( + let (journal, origin) = run::run( program_id, original_code_id, state_root, @@ -56,9 +56,9 @@ fn _run(arg_ptr: i32, arg_len: i32) -> i64 { let chunks = journal.encoded_size() / 32 * 1024 * 1024 + 1; // never zero let chunk_size = (journal.len() / chunks).max(1); // never zero - let res: Vec<_> = journal.chunks(chunk_size).map(return_val).collect(); + let chunked_journal: Vec<_> = journal.chunks(chunk_size).map(return_val).collect(); - return_val(res) + return_val((chunked_journal, origin)) } fn get_vec(ptr: i32, len: i32) -> Vec { diff --git a/ethexe/runtime/src/wasm/api/run.rs b/ethexe/runtime/src/wasm/api/run.rs index d8ee7cafb8a..b65ab273e25 100644 --- a/ethexe/runtime/src/wasm/api/run.rs +++ b/ethexe/runtime/src/wasm/api/run.rs @@ -22,6 +22,7 @@ use crate::wasm::{ }; use alloc::vec::Vec; use core_processor::{common::JournalNote, configs::BlockInfo}; +use ethexe_common::gear::Origin; use ethexe_runtime_common::{process_next_message, state::Storage, RuntimeInterface}; use gear_core::{code::InstrumentedCode, ids::ProgramId}; use gprimitives::{CodeId, H256}; @@ -31,7 +32,7 @@ pub fn run( original_code_id: CodeId, state_root: H256, maybe_instrumented_code: Option, -) -> Vec { +) -> (Vec, Option) { log::debug!("You're calling 'run(..)'"); let block_info = BlockInfo { @@ -46,7 +47,7 @@ pub fn run( let program_state = ri.storage().read_state(state_root).unwrap(); - let journal = process_next_message( + let (journal, origin) = process_next_message( program_id, program_state, maybe_instrumented_code, @@ -54,11 +55,14 @@ pub fn run( &ri, ); - log::debug!("Done creating journal: {} notes", journal.len()); + log::debug!( + "Done creating journal: {} notes, origin {origin:?}", + journal.len() + ); for note in &journal { log::debug!("{note:?}"); } - journal + (journal, origin) } From cfeabe71a9943cdee20c31740e9bba70a31fc8ed Mon Sep 17 00:00:00 2001 From: Roman Maslennikov Date: Thu, 13 Feb 2025 20:13:42 +0400 Subject: [PATCH 6/9] Remove derive_more::deref --- ethexe/runtime/common/src/journal.rs | 352 ++++++++++++++------------- 1 file changed, 181 insertions(+), 171 deletions(-) diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index d5f745c6fcf..ec48d4c6385 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -17,12 +17,9 @@ use gear_core::{ use gear_core_errors::SignalCode; use gprimitives::{ActorId, CodeId, MessageId, ReservationId}; -#[derive(derive_more::Deref, derive_more::DerefMut)] pub struct Handler<'a, S: Storage> { pub program_id: ProgramId, pub dispatch_origin: Origin, - #[deref] - #[deref_mut] pub controller: TransitionController<'a, S>, } @@ -34,22 +31,23 @@ impl Handler<'_, S> { dispatch: Dispatch, delay: u32, ) { - self.update_state(destination, |state, storage, transitions| { - if let Ok(non_zero_delay) = delay.try_into() { - let expiry = transitions.schedule_task( - non_zero_delay, - ScheduledTask::SendDispatch((destination, dispatch.id)), - ); + self.controller + .update_state(destination, |state, storage, transitions| { + if let Ok(non_zero_delay) = delay.try_into() { + let expiry = transitions.schedule_task( + non_zero_delay, + ScheduledTask::SendDispatch((destination, dispatch.id)), + ); - state.stash_hash.modify_stash(storage, |stash| { - stash.add_to_program(dispatch.id, dispatch, expiry); - }) - } else { - state - .queue_hash - .modify_queue(storage, |queue| queue.queue(dispatch)); - } - }) + state.stash_hash.modify_stash(storage, |stash| { + stash.add_to_program(dispatch.id, dispatch, expiry); + }) + } else { + state + .queue_hash + .modify_queue(storage, |queue| queue.queue(dispatch)); + } + }) } fn send_dispatch_to_user( @@ -59,7 +57,8 @@ impl Handler<'_, S> { delay: u32, ) { if dispatch.is_reply() { - self.transitions + self.controller + .transitions .modify_transition(dispatch.source(), |transition| { transition.messages.push(dispatch.into_parts().1.into()) }); @@ -69,45 +68,46 @@ impl Handler<'_, S> { let dispatch_origin = self.dispatch_origin; - self.update_state(dispatch.source(), |state, storage, transitions| { - if let Ok(non_zero_delay) = delay.try_into() { - let expiry = transitions.schedule_task( - non_zero_delay, - ScheduledTask::SendUserMessage { - message_id: dispatch.id(), - to_mailbox: dispatch.source(), - }, - ); - - let user_id = dispatch.destination(); - let dispatch = Dispatch::from_core_stored(storage, dispatch, dispatch_origin); - - state.stash_hash.modify_stash(storage, |stash| { - stash.add_to_user(dispatch.id, dispatch, expiry, user_id); - }); - } else { - let expiry = transitions.schedule_task( - MAILBOX_VALIDITY.try_into().expect("infallible"), - ScheduledTask::RemoveFromMailbox( - (dispatch.source(), dispatch.destination(), dispatch_origin), - dispatch.id(), - ), - ); + self.controller + .update_state(dispatch.source(), |state, storage, transitions| { + if let Ok(non_zero_delay) = delay.try_into() { + let expiry = transitions.schedule_task( + non_zero_delay, + ScheduledTask::SendUserMessage { + message_id: dispatch.id(), + to_mailbox: dispatch.source(), + }, + ); - state.mailbox_hash.modify_mailbox(storage, |mailbox| { - mailbox.add( - dispatch.destination(), - dispatch.id(), - dispatch.value(), - expiry, + let user_id = dispatch.destination(); + let dispatch = Dispatch::from_core_stored(storage, dispatch, dispatch_origin); + + state.stash_hash.modify_stash(storage, |stash| { + stash.add_to_user(dispatch.id, dispatch, expiry, user_id); + }); + } else { + let expiry = transitions.schedule_task( + MAILBOX_VALIDITY.try_into().expect("infallible"), + ScheduledTask::RemoveFromMailbox( + (dispatch.source(), dispatch.destination(), dispatch_origin), + dispatch.id(), + ), ); - }); - transitions.modify_transition(dispatch.source(), |transition| { - transition.messages.push(dispatch.into_parts().1.into()) - }); - } - }); + state.mailbox_hash.modify_mailbox(storage, |mailbox| { + mailbox.add( + dispatch.destination(), + dispatch.id(), + dispatch.value(), + expiry, + ); + }); + + transitions.modify_transition(dispatch.source(), |transition| { + transition.messages.push(dispatch.into_parts().1.into()) + }); + } + }); } } @@ -126,21 +126,22 @@ impl JournalHandler for Handler<'_, S> { DispatchOutcome::InitSuccess { program_id } => { log::trace!("Dispatch {message_id} successfully initialized program {program_id}"); - self.update_state(program_id, |state, _, _| { - match &mut state.program { - Program::Active(ActiveProgram { initialized, .. }) if *initialized => { - bail!("an attempt to initialize already initialized program") - } - &mut Program::Active(ActiveProgram { - ref mut initialized, - .. - }) => *initialized = true, - _ => bail!("an attempt to dispatch init message for inactive program"), - }; - - Ok(()) - }) - .expect("failed to update state"); + self.controller + .update_state(program_id, |state, _, _| { + match &mut state.program { + Program::Active(ActiveProgram { initialized, .. }) if *initialized => { + bail!("an attempt to initialize already initialized program") + } + &mut Program::Active(ActiveProgram { + ref mut initialized, + .. + }) => *initialized = true, + _ => bail!("an attempt to dispatch init message for inactive program"), + }; + + Ok(()) + }) + .expect("failed to update state"); } DispatchOutcome::InitFailure { @@ -150,7 +151,7 @@ impl JournalHandler for Handler<'_, S> { } => { log::trace!("Dispatch {message_id} failed init of program {program_id}: {reason}"); - self.update_state(program_id, |state, _, _| { + self.controller.update_state(program_id, |state, _, _| { state.program = Program::Terminated(origin) }); } @@ -173,38 +174,42 @@ impl JournalHandler for Handler<'_, S> { fn exit_dispatch(&mut self, id_exited: ProgramId, value_destination: ProgramId) { // TODO (breathx): handle rest of value cases; exec balance into value_to_receive. - let balance = self.update_state(id_exited, |state, _, transitions| { - state.program = Program::Exited(value_destination); + let balance = self + .controller + .update_state(id_exited, |state, _, transitions| { + state.program = Program::Exited(value_destination); - transitions.modify_transition(id_exited, |transition| { - transition.inheritor = value_destination - }); + transitions.modify_transition(id_exited, |transition| { + transition.inheritor = value_destination + }); - mem::replace(&mut state.balance, 0) - }); + mem::replace(&mut state.balance, 0) + }); - if self.transitions.is_program(&value_destination) { - self.update_state(value_destination, |state, _, _| { - state.balance += balance; - }) + if self.controller.transitions.is_program(&value_destination) { + self.controller + .update_state(value_destination, |state, _, _| { + state.balance += balance; + }) } } fn message_consumed(&mut self, message_id: MessageId) { let program_id = self.program_id; - self.update_state(program_id, |state, storage, _| { - state.queue_hash.modify_queue(storage, |queue| { - let head = queue - .dequeue() - .expect("an attempt to consume message from empty queue"); + self.controller + .update_state(program_id, |state, storage, _| { + state.queue_hash.modify_queue(storage, |queue| { + let head = queue + .dequeue() + .expect("an attempt to consume message from empty queue"); - assert_eq!( - head.id, message_id, - "queue head doesn't match processed message" - ); - }); - }) + assert_eq!( + head.id, message_id, + "queue head doesn't match processed message" + ); + }); + }) } fn send_dispatch( @@ -221,8 +226,9 @@ impl JournalHandler for Handler<'_, S> { let destination = dispatch.destination(); let dispatch = dispatch.into_stored(); - if self.transitions.is_program(&destination) { - let dispatch = Dispatch::from_core_stored(self.storage, dispatch, self.dispatch_origin); + if self.controller.transitions.is_program(&destination) { + let dispatch = + Dispatch::from_core_stored(self.controller.storage, dispatch, self.dispatch_origin); self.send_dispatch_to_program(message_id, destination, dispatch, delay); } else { @@ -246,29 +252,30 @@ impl JournalHandler for Handler<'_, S> { let program_id = self.program_id; let dispatch_origin = self.dispatch_origin; - self.update_state(program_id, |state, storage, transitions| { - let expiry = transitions.schedule_task( - in_blocks, - ScheduledTask::WakeMessage(dispatch.destination(), dispatch.id()), - ); + self.controller + .update_state(program_id, |state, storage, transitions| { + let expiry = transitions.schedule_task( + in_blocks, + ScheduledTask::WakeMessage(dispatch.destination(), dispatch.id()), + ); - let dispatch = Dispatch::from_core_stored(storage, dispatch, dispatch_origin); + let dispatch = Dispatch::from_core_stored(storage, dispatch, dispatch_origin); - state.queue_hash.modify_queue(storage, |queue| { - let head = queue - .dequeue() - .expect("an attempt to wait message from empty queue"); + state.queue_hash.modify_queue(storage, |queue| { + let head = queue + .dequeue() + .expect("an attempt to wait message from empty queue"); - assert_eq!( - head.id, dispatch.id, - "queue head doesn't match processed message" - ); - }); + assert_eq!( + head.id, dispatch.id, + "queue head doesn't match processed message" + ); + }); - state.waitlist_hash.modify_waitlist(storage, |waitlist| { - waitlist.wait(dispatch.id, dispatch, expiry); + state.waitlist_hash.modify_waitlist(storage, |waitlist| { + waitlist.wait(dispatch.id, dispatch, expiry); + }); }); - }); } // TODO (breathx): deprecate delayed wakes? @@ -285,28 +292,29 @@ impl JournalHandler for Handler<'_, S> { log::trace!("Dispatch {message_id} tries to wake {awakening_id}"); - self.update_state(program_id, |state, storage, transitions| { - let Some(ValueWithExpiry { - value: dispatch, - expiry, - }) = state - .waitlist_hash - .modify_waitlist(storage, |waitlist| waitlist.wake(&awakening_id)) - else { - return; - }; + self.controller + .update_state(program_id, |state, storage, transitions| { + let Some(ValueWithExpiry { + value: dispatch, + expiry, + }) = state + .waitlist_hash + .modify_waitlist(storage, |waitlist| waitlist.wake(&awakening_id)) + else { + return; + }; - state - .queue_hash - .modify_queue(storage, |queue| queue.queue(dispatch)); + state + .queue_hash + .modify_queue(storage, |queue| queue.queue(dispatch)); - transitions - .remove_task( - expiry, - &ScheduledTask::WakeMessage(program_id, awakening_id), - ) - .expect("failed to remove scheduled task"); - }); + transitions + .remove_task( + expiry, + &ScheduledTask::WakeMessage(program_id, awakening_id), + ) + .expect("failed to remove scheduled task"); + }); } fn update_pages_data( @@ -318,21 +326,22 @@ impl JournalHandler for Handler<'_, S> { return; } - self.update_state(program_id, |state, storage, _| { - let Program::Active(ActiveProgram { - ref mut pages_hash, .. - }) = state.program - else { - bail!("an attempt to update pages data of inactive program"); - }; - - pages_hash.modify_pages(storage, |pages| { - pages.update_and_store_regions(storage, storage.write_pages_data(pages_data)); - }); + self.controller + .update_state(program_id, |state, storage, _| { + let Program::Active(ActiveProgram { + ref mut pages_hash, .. + }) = state.program + else { + bail!("an attempt to update pages data of inactive program"); + }; + + pages_hash.modify_pages(storage, |pages| { + pages.update_and_store_regions(storage, storage.write_pages_data(pages_data)); + }); - Ok(()) - }) - .expect("failed to update state"); + Ok(()) + }) + .expect("failed to update state"); } fn update_allocations( @@ -340,39 +349,40 @@ impl JournalHandler for Handler<'_, S> { program_id: ProgramId, new_allocations: IntervalsTree, ) { - self.update_state(program_id, |state, storage, _| { - let Program::Active(ActiveProgram { - allocations_hash, - pages_hash, - .. - }) = &mut state.program - else { - bail!("an attempt to update allocations of inactive program"); - }; - - allocations_hash.modify_allocations(storage, |allocations| { - let removed_pages = allocations.update(new_allocations); - - if !removed_pages.is_empty() { - pages_hash.modify_pages(storage, |pages| { - pages.remove_and_store_regions(storage, &removed_pages); - }) - } - }); + self.controller + .update_state(program_id, |state, storage, _| { + let Program::Active(ActiveProgram { + allocations_hash, + pages_hash, + .. + }) = &mut state.program + else { + bail!("an attempt to update allocations of inactive program"); + }; + + allocations_hash.modify_allocations(storage, |allocations| { + let removed_pages = allocations.update(new_allocations); + + if !removed_pages.is_empty() { + pages_hash.modify_pages(storage, |pages| { + pages.remove_and_store_regions(storage, &removed_pages); + }) + } + }); - Ok(()) - }) - .expect("failed to update state"); + Ok(()) + }) + .expect("failed to update state"); } fn send_value(&mut self, from: ProgramId, to: Option, value: u128) { // TODO (breathx): implement rest of cases. if let Some(to) = to { - if self.transitions.state_of(&from).is_some() { + if self.controller.transitions.state_of(&from).is_some() { return; } - self.update_state(to, |state, _, transitions| { + self.controller.update_state(to, |state, _, transitions| { state.balance += value; transitions From 80de7339c73aefa51da5eba9eaa8c4e6ff5e19de Mon Sep 17 00:00:00 2001 From: Roman Maslennikov Date: Thu, 13 Feb 2025 20:16:03 +0400 Subject: [PATCH 7/9] Review fixes III --- ethexe/runtime/common/src/lib.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 9e4a1d65478..ced2cf48f7d 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -92,24 +92,6 @@ impl TransitionController<'_, S> { res } - - pub fn access_state( - &self, - program_id: ProgramId, - f: impl FnOnce(&ProgramState, &S, &InBlockTransitions) -> T, - ) -> T { - let state_hash = self - .transitions - .state_of(&program_id) - .expect("failed to find program in known states"); - - let state = self - .storage - .read_state(state_hash) - .expect("failed to read state from storage"); - - f(&state, self.storage, self.transitions) - } } pub fn process_next_message( From 16a5d5f5e4d8ab55e7665154d719cab7ca9ade5e Mon Sep 17 00:00:00 2001 From: Roman Maslennikov Date: Thu, 13 Feb 2025 20:35:37 +0400 Subject: [PATCH 8/9] default std feat --- Cargo.lock | 1 - ethexe/common/Cargo.toml | 12 ++++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f6fc0a27c0..0a0f5359b16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4600,7 +4600,6 @@ name = "ethexe-common" version = "1.7.1" dependencies = [ "anyhow", - "derive_more 0.99.18", "gear-core", "gprimitives", "hex", diff --git a/ethexe/common/Cargo.toml b/ethexe/common/Cargo.toml index dd5ba1e55c5..81fcf14a0ae 100644 --- a/ethexe/common/Cargo.toml +++ b/ethexe/common/Cargo.toml @@ -9,13 +9,17 @@ repository.workspace = true [dependencies] gear-core.workspace = true -gprimitives = { workspace = true, features = ["serde"] } +gprimitives.workspace = true parity-scale-codec.workspace = true -derive_more.workspace = true hex.workspace = true anyhow.workspace = true -serde.workspace = true +serde = { workspace = true, optional = true } roast-secp256k1-evm.workspace = true [features] -std = ["gear-core/std", "gprimitives/serde"] +default = ["std"] +std = [ + "gear-core/std", + "gprimitives/serde", + "serde" +] From 814c384e3458692588dcad12f18e4179ce87d9e5 Mon Sep 17 00:00:00 2001 From: Roman Maslennikov Date: Fri, 14 Feb 2025 15:19:15 +0400 Subject: [PATCH 9/9] Remove origin param from `remove_from_mailbox` --- ethexe/common/src/db.rs | 9 +++------ ethexe/processor/src/handling/events.rs | 10 ++-------- ethexe/runtime/common/src/journal.rs | 2 +- ethexe/runtime/common/src/schedule.rs | 10 ++++------ ethexe/service/src/tests.rs | 10 ++-------- 5 files changed, 12 insertions(+), 29 deletions(-) diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index 57376113a27..2f68ce9e3c8 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -18,10 +18,7 @@ //! ethexe common db types and traits. -use crate::{ - events::BlockRequestEvent, - gear::{Origin, StateTransition}, -}; +use crate::{events::BlockRequestEvent, gear::StateTransition}; use alloc::{ collections::{BTreeMap, BTreeSet, VecDeque}, vec::Vec, @@ -33,8 +30,8 @@ use gear_core::{ use gprimitives::{MessageId, H256}; use parity_scale_codec::{Decode, Encode}; -/// RemoveFromMailbox key; (msgs sources program (mailbox and queue provider), destination user id, message origin) -pub type Rfm = (ProgramId, ActorId, Origin); +/// RemoveFromMailbox key; (msgs sources program (mailbox and queue provider), destination user id) +pub type Rfm = (ProgramId, ActorId); /// SendDispatch key; (msgs destinations program (stash and queue provider), message id) pub type Sd = (ProgramId, MessageId); diff --git a/ethexe/processor/src/handling/events.rs b/ethexe/processor/src/handling/events.rs index c0be44d2377..ae65f0953d9 100644 --- a/ethexe/processor/src/handling/events.rs +++ b/ethexe/processor/src/handling/events.rs @@ -125,10 +125,7 @@ impl ProcessingHandler { transitions.remove_task( expiry, - &ScheduledTask::RemoveFromMailbox( - (actor_id, source, Origin::Ethereum), - replied_to, - ), + &ScheduledTask::RemoveFromMailbox((actor_id, source), replied_to), )?; let reply = Dispatch::new_reply( @@ -165,10 +162,7 @@ impl ProcessingHandler { transitions.remove_task( expiry, - &ScheduledTask::RemoveFromMailbox( - (actor_id, source, Origin::Ethereum), - claimed_id, - ), + &ScheduledTask::RemoveFromMailbox((actor_id, source), claimed_id), )?; let reply = Dispatch::reply( diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index ec48d4c6385..2b4b3a4141a 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -89,7 +89,7 @@ impl Handler<'_, S> { let expiry = transitions.schedule_task( MAILBOX_VALIDITY.try_into().expect("infallible"), ScheduledTask::RemoveFromMailbox( - (dispatch.source(), dispatch.destination(), dispatch_origin), + (dispatch.source(), dispatch.destination()), dispatch.id(), ), ); diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index 1fbbe8d3fe1..81939fc6e8d 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -17,7 +17,7 @@ pub struct Handler<'a, S: Storage> { impl TaskHandler for Handler<'_, S> { fn remove_from_mailbox( &mut self, - (program_id, user_id, dispatch_origin): (ProgramId, ActorId, Origin), + (program_id, user_id): (ProgramId, ActorId), message_id: MessageId, ) -> u64 { self.controller @@ -43,7 +43,8 @@ impl TaskHandler for Handler<'_, S> { PayloadLookup::empty(), 0, SuccessReplyReason::Auto, - dispatch_origin, + // TODO(rmasl): use the actual origin (https://github.com/gear-tech/gear/pull/4460) + Origin::Ethereum, ); state @@ -78,10 +79,7 @@ impl TaskHandler for Handler<'_, S> { let expiry = transitions.schedule_task( MAILBOX_VALIDITY.try_into().expect("infallible"), - ScheduledTask::RemoveFromMailbox( - (program_id, user_id, dispatch.origin), - stashed_message_id, - ), + ScheduledTask::RemoveFromMailbox((program_id, user_id), stashed_message_id), ); state.mailbox_hash.modify_mailbox(storage, |mailbox| { diff --git a/ethexe/service/src/tests.rs b/ethexe/service/src/tests.rs index edb33d0e256..60a6c9eb334 100644 --- a/ethexe/service/src/tests.rs +++ b/ethexe/service/src/tests.rs @@ -466,14 +466,8 @@ async fn mailbox() { ( expiry, BTreeSet::from_iter([ - ScheduledTask::RemoveFromMailbox( - (pid, env.sender_id, Default::default()), - mid_expected_message, - ), - ScheduledTask::RemoveFromMailbox( - (pid, env.sender_id, Default::default()), - ping_expected_message, - ), + ScheduledTask::RemoveFromMailbox((pid, env.sender_id), mid_expected_message), + ScheduledTask::RemoveFromMailbox((pid, env.sender_id), ping_expected_message), ]), ), ]);