Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe): add dispatch origin #4448

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
node_modules/
target/
target-no-lazy/
target-rust-analyzer/
target-xwin/
log/
weight-dumps/
Expand Down
22 changes: 20 additions & 2 deletions ethexe/processor/src/handling/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ impl ProcessingHandler {
return Ok(());
}

let dispatch_origin = self.dispatch_origin;
ByteNacked marked this conversation as resolved.
Show resolved Hide resolved

match event {
MirrorRequestEvent::ExecutableBalanceTopUpRequested { value } => {
self.update_state(actor_id, |state, _, _| {
Expand All @@ -81,7 +83,15 @@ impl ProcessingHandler {
self.update_state(actor_id, |state, storage, _| -> Result<()> {
let is_init = state.requires_init_message();

let dispatch = Dispatch::new(storage, id, source, payload, value, is_init)?;
let dispatch = Dispatch::new(
storage,
id,
source,
payload,
value,
is_init,
dispatch_origin,
)?;

state
.queue_hash
Expand Down Expand Up @@ -120,7 +130,14 @@ impl ProcessingHandler {
&ScheduledTask::RemoveFromMailbox((actor_id, source), replied_to),
)?;

let reply = Dispatch::new_reply(storage, replied_to, source, payload, value)?;
let reply = Dispatch::new_reply(
storage,
replied_to,
source,
payload,
value,
dispatch_origin,
)?;

state
.queue_hash
Expand Down Expand Up @@ -156,6 +173,7 @@ impl ProcessingHandler {
PayloadLookup::empty(),
0,
SuccessReplyReason::Auto,
dispatch_origin,
);

state
Expand Down
8 changes: 6 additions & 2 deletions ethexe/processor/src/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::Processor;
use anyhow::{anyhow, Result};
use ethexe_db::{BlockMetaStorage, CodesStorage, Database};
use ethexe_runtime_common::{
state::ProgramState, InBlockTransitions, ScheduleHandler, TransitionController,
state::{Origin, ProgramState},
InBlockTransitions, ScheduleHandler, TransitionController,
};
use gprimitives::{ActorId, CodeId, H256};

Expand All @@ -31,6 +32,7 @@ pub struct ProcessingHandler {
pub block_hash: H256,
pub db: Database,
pub transitions: InBlockTransitions,
pub dispatch_origin: Origin,
ByteNacked marked this conversation as resolved.
Show resolved Hide resolved
}

impl ProcessingHandler {
Expand All @@ -51,7 +53,7 @@ impl ProcessingHandler {
}

impl Processor {
pub fn handler(&self, block_hash: H256) -> Result<ProcessingHandler> {
pub fn handler(&self, block_hash: H256, dispatch_origin: Origin) -> Result<ProcessingHandler> {
let header = self
.db
.block_header(block_hash)
Expand All @@ -74,6 +76,7 @@ impl Processor {
block_hash,
db: self.db.clone(),
transitions,
dispatch_origin,
})
}

Expand Down Expand Up @@ -112,6 +115,7 @@ impl ProcessingHandler {
);

let mut handler = ScheduleHandler {
dispatch_origin: self.dispatch_origin,
controller: self.controller(),
};

Expand Down
8 changes: 7 additions & 1 deletion ethexe/processor/src/handling/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use crate::{
};
use core_processor::common::JournalNote;
use ethexe_db::{CodesStorage, Database};
use ethexe_runtime_common::{InBlockTransitions, JournalHandler, TransitionController};
use ethexe_runtime_common::{
state::Origin, InBlockTransitions, JournalHandler, TransitionController,
};
use gear_core::ids::ProgramId;
use gprimitives::H256;
use std::collections::BTreeMap;
Expand All @@ -41,6 +43,7 @@ pub fn run(
db: Database,
instance_creator: InstanceCreator,
in_block_transitions: &mut InBlockTransitions,
dispatch_origin: Origin,
ByteNacked marked this conversation as resolved.
Show resolved Hide resolved
) {
tokio::task::block_in_place(|| {
let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
Expand All @@ -59,6 +62,7 @@ pub fn run(
db,
instance_creator,
in_block_transitions,
dispatch_origin,
)
.await
})
Expand All @@ -72,6 +76,7 @@ async fn run_in_async(
db: Database,
instance_creator: InstanceCreator,
in_block_transitions: &mut InBlockTransitions,
dispatch_origin: Origin,
) {
let mut task_senders = vec![];
let mut handles = vec![];
Expand Down Expand Up @@ -112,6 +117,7 @@ async fn run_in_async(
transitions: in_block_transitions,
storage: &db,
},
dispatch_origin,
};
core_processor::handle_journal(journal, &mut handler);
}
Expand Down
11 changes: 8 additions & 3 deletions ethexe/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use anyhow::{anyhow, ensure, Result};
use ethexe_common::events::{BlockRequestEvent, MirrorRequestEvent};
use ethexe_db::{BlockMetaStorage, CodesStorage, Database};
use ethexe_runtime_common::state::Storage;
use ethexe_runtime_common::state::{Origin, Storage};
use gear_core::{ids::prelude::CodeIdExt, message::ReplyInfo};
use gprimitives::{ActorId, CodeId, MessageId, H256};
use handling::{run, ProcessingHandler};
Expand Down Expand Up @@ -106,7 +106,9 @@ impl Processor {
) -> Result<Vec<LocalOutcome>> {
log::debug!("Processing events for {block_hash:?}: {events:#?}");

let mut handler = self.handler(block_hash)?;
// ATM only eth origin events
let dispatch_origin = Origin::Ethereum;
let mut handler = self.handler(block_hash, dispatch_origin)?;

for event in events {
match event {
Expand Down Expand Up @@ -146,6 +148,7 @@ impl Processor {
self.db.clone(),
self.creator.clone(),
&mut handler.transitions,
handler.dispatch_origin,
);
}
}
Expand All @@ -165,7 +168,9 @@ impl OverlaidProcessor {
) -> Result<ReplyInfo> {
self.0.creator.set_chain_head(block_hash);

let mut handler = self.0.handler(block_hash)?;
// ATM only eth origin events
let dispatch_origin = Origin::Ethereum;
let mut handler = self.0.handler(block_hash, dispatch_origin)?;

let state_hash = handler
.transitions
Expand Down
8 changes: 4 additions & 4 deletions ethexe/processor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ fn ping_pong() {
.expect("failed to call runtime api")
.expect("code failed verification or instrumentation");

let mut handler = processor.handler(ch0).unwrap();
let mut handler = processor.handler(ch0, Default::default()).unwrap();

handler
.handle_router_event(RouterRequestEvent::ProgramCreated { actor_id, code_id })
Expand Down Expand Up @@ -333,7 +333,7 @@ fn async_and_ping() {
.expect("failed to call runtime api")
.expect("code failed verification or instrumentation");

let mut handler = processor.handler(ch0).unwrap();
let mut handler = processor.handler(ch0, Default::default()).unwrap();

handler
.handle_router_event(RouterRequestEvent::ProgramCreated {
Expand Down Expand Up @@ -469,7 +469,7 @@ fn many_waits() {
.expect("failed to call runtime api")
.expect("code failed verification or instrumentation");

let mut handler = processor.handler(ch0).unwrap();
let mut handler = processor.handler(ch0, Default::default()).unwrap();

let amount = 10000;
for i in 0..amount {
Expand Down Expand Up @@ -581,7 +581,7 @@ fn many_waits() {
assert_eq!(schedule, expected_schedule);
}

let mut handler = processor.handler(ch11).unwrap();
let mut handler = processor.handler(ch11, Default::default()).unwrap();
handler.run_schedule();
processor.process_queue(&mut handler);

Expand Down
12 changes: 8 additions & 4 deletions ethexe/runtime/common/src/journal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
state::{ActiveProgram, Dispatch, Program, Storage, ValueWithExpiry, MAILBOX_VALIDITY},
state::{ActiveProgram, Dispatch, Origin, Program, Storage, ValueWithExpiry, MAILBOX_VALIDITY},
TransitionController,
};
use alloc::{collections::BTreeMap, vec::Vec};
Expand All @@ -20,6 +20,7 @@ use gprimitives::{ActorId, CodeId, MessageId, ReservationId};
#[derive(derive_more::Deref, derive_more::DerefMut)]
pub struct Handler<'a, S: Storage> {
pub program_id: ProgramId,
pub dispatch_origin: Origin,
ByteNacked marked this conversation as resolved.
Show resolved Hide resolved
#[deref]
#[deref_mut]
pub controller: TransitionController<'a, S>,
Expand Down Expand Up @@ -66,6 +67,8 @@ impl<S: Storage> Handler<'_, S> {
return;
}

let dispatch_origin = self.dispatch_origin;

self.update_state(dispatch.source(), |state, storage, transitions| {
if let Ok(non_zero_delay) = delay.try_into() {
let expiry = transitions.schedule_task(
Expand All @@ -77,7 +80,7 @@ impl<S: Storage> Handler<'_, S> {
);

let user_id = dispatch.destination();
let dispatch = Dispatch::from_stored(storage, dispatch);
let dispatch = Dispatch::from_stored(storage, dispatch, dispatch_origin);
ByteNacked marked this conversation as resolved.
Show resolved Hide resolved

state.stash_hash.modify_stash(storage, |stash| {
stash.add_to_user(dispatch.id, dispatch, expiry, user_id);
Expand Down Expand Up @@ -219,7 +222,7 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
let dispatch = dispatch.into_stored();

if self.transitions.is_program(&destination) {
let dispatch = Dispatch::from_stored(self.storage, dispatch);
let dispatch = Dispatch::from_stored(self.storage, dispatch, self.dispatch_origin);

self.send_dispatch_to_program(message_id, destination, dispatch, delay);
} else {
Expand All @@ -241,14 +244,15 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
NonZero::<u32>::try_from(duration).expect("must be checked on backend side");

let program_id = self.program_id;
let dispatch_origin = self.dispatch_origin;

self.update_state(program_id, |state, storage, transitions| {
let expiry = transitions.schedule_task(
in_blocks,
ScheduledTask::WakeMessage(dispatch.destination(), dispatch.id()),
);

let dispatch = Dispatch::from_stored(storage, dispatch);
let dispatch = Dispatch::from_stored(storage, dispatch, dispatch_origin);

state.queue_hash.modify_queue(storage, |queue| {
let head = queue
Expand Down
1 change: 1 addition & 0 deletions ethexe/runtime/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ where
value,
details,
context,
..
ByteNacked marked this conversation as resolved.
Show resolved Hide resolved
} = queue.dequeue().unwrap();

let payload = payload.query(ri.storage()).expect("failed to get payload");
Expand Down
7 changes: 6 additions & 1 deletion ethexe/runtime/common/src/schedule.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
state::{Dispatch, PayloadLookup, Storage, ValueWithExpiry, MAILBOX_VALIDITY},
state::{Dispatch, Origin, PayloadLookup, Storage, ValueWithExpiry, MAILBOX_VALIDITY},
TransitionController,
};
use ethexe_common::{
Expand All @@ -12,7 +12,10 @@ use gprimitives::{ActorId, CodeId, MessageId, ReservationId};

#[derive(derive_more::Deref, derive_more::DerefMut)]
pub struct Handler<'a, S: Storage> {
#[deref_mut]
ByteNacked marked this conversation as resolved.
Show resolved Hide resolved
#[deref]
pub controller: TransitionController<'a, S>,
pub dispatch_origin: Origin,
}

impl<S: Storage> TaskHandler<Rfm, Sd, Sum> for Handler<'_, S> {
Expand All @@ -21,6 +24,7 @@ impl<S: Storage> TaskHandler<Rfm, Sd, Sum> for Handler<'_, S> {
(program_id, user_id): (ProgramId, ActorId),
message_id: MessageId,
) -> u64 {
let dispatch_origin = self.dispatch_origin;
self.update_state(program_id, |state, storage, transitions| {
let ValueWithExpiry { value, .. } =
state.mailbox_hash.modify_mailbox(storage, |mailbox| {
Expand All @@ -43,6 +47,7 @@ impl<S: Storage> TaskHandler<Rfm, Sd, Sum> for Handler<'_, S> {
PayloadLookup::empty(),
0,
SuccessReplyReason::Auto,
dispatch_origin,
);

state
Expand Down
Loading
Loading