diff --git a/MIGRATION.md b/MIGRATION.md index 143d288bde..bc88afa9bf 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -1,3 +1,10 @@ +# 0.15.0 -> 0.16.0 +- `EventManager` is refactored to avoid calling function from `Fuzzer`, thus we do not evaluate testcases in `EventManager` anymore. + - Now we have `EventReceiver` in `events` module, and `EventProessor` in `fuzzer` module. + - `EventReceiver` is responsible for receiving testcases and delegates its evaluation to `EventProcessor`. + - `EventProcessor` is responsible for evaluating the testcases passed by the `EventReceiver`. + - Since we don't evaluate testcases in the `EventManager` anymore. `on_fire` and `post_exec` have been deleted from `EventManagerHook`. + - Similarly `pre_exec` has been renamed to `pre_receive`. # 0.14.1 -> 0.15.0 - `MmapShMem::new` and `MmapShMemProvider::new_shmem_with_id` now take `AsRef` instead of a byte array for the filename/id. diff --git a/fuzzers/baby/baby_fuzzer_custom_executor/src/main.rs b/fuzzers/baby/baby_fuzzer_custom_executor/src/main.rs index 3b3abdbfb8..d099e2f10b 100644 --- a/fuzzers/baby/baby_fuzzer_custom_executor/src/main.rs +++ b/fuzzers/baby/baby_fuzzer_custom_executor/src/main.rs @@ -7,7 +7,7 @@ use libafl::monitors::tui::TuiMonitor; #[cfg(not(feature = "tui"))] use libafl::monitors::SimpleMonitor; use libafl::{ - corpus::{Corpus, InMemoryCorpus, OnDiskCorpus}, + corpus::{InMemoryCorpus, OnDiskCorpus}, events::SimpleEventManager, executors::{Executor, ExitKind, WithObservers}, feedback_and_fast, diff --git a/fuzzers/baby/baby_fuzzer_unicode/src/main.rs b/fuzzers/baby/baby_fuzzer_unicode/src/main.rs index 5e7b5eed80..9a9a1e823a 100644 --- a/fuzzers/baby/baby_fuzzer_unicode/src/main.rs +++ b/fuzzers/baby/baby_fuzzer_unicode/src/main.rs @@ -123,7 +123,7 @@ pub fn main() { &mut state, &mut executor, &mut mgr, - BytesInput::new(vec![b'a']), + &BytesInput::new(vec![b'a']), ) .unwrap(); diff --git a/fuzzers/forkserver/libafl-fuzz/src/fuzzer.rs b/fuzzers/forkserver/libafl-fuzz/src/fuzzer.rs index fdf200fd9b..8c8b8328a3 100644 --- a/fuzzers/forkserver/libafl-fuzz/src/fuzzer.rs +++ b/fuzzers/forkserver/libafl-fuzz/src/fuzzer.rs @@ -78,7 +78,6 @@ pub type LibaflFuzzState = #[cfg(not(feature = "fuzzbench"))] type LibaflFuzzManager = CentralizedEventManager< LlmpRestartingEventManager<(), BytesInput, LibaflFuzzState, StdShMem, StdShMemProvider>, - (), BytesInput, LibaflFuzzState, StdShMem, diff --git a/fuzzers/forkserver/libafl-fuzz/src/hooks.rs b/fuzzers/forkserver/libafl-fuzz/src/hooks.rs index 672c420363..2418fb5158 100644 --- a/fuzzers/forkserver/libafl-fuzz/src/hooks.rs +++ b/fuzzers/forkserver/libafl-fuzz/src/hooks.rs @@ -14,7 +14,7 @@ impl EventManagerHook for LibAflFuzzEventHook where S: Stoppable, { - fn pre_exec( + fn pre_receive( &mut self, state: &mut S, _client_id: ClientId, @@ -26,7 +26,4 @@ where } Ok(true) } - fn post_exec(&mut self, _state: &mut S, _client_id: ClientId) -> Result { - Ok(true) - } } diff --git a/fuzzers/inprocess/libfuzzer_libpng_centralized/src/lib.rs b/fuzzers/inprocess/libfuzzer_libpng_centralized/src/lib.rs index f4508a1bb0..8eb53dafe8 100644 --- a/fuzzers/inprocess/libfuzzer_libpng_centralized/src/lib.rs +++ b/fuzzers/inprocess/libfuzzer_libpng_centralized/src/lib.rs @@ -141,7 +141,7 @@ pub extern "C" fn libafl_main() { let mut secondary_run_client = |state: Option<_>, - mut mgr: CentralizedEventManager<_, _, _, _, _, _>, + mut mgr: CentralizedEventManager<_, _, _, _, _>, _client_description: ClientDescription| { // Create an observation channel using the coverage map let edges_observer = diff --git a/fuzzers/inprocess/sqlite_centralized_multi_machine/src/lib.rs b/fuzzers/inprocess/sqlite_centralized_multi_machine/src/lib.rs index 096f06e5ad..7dc2719bc7 100644 --- a/fuzzers/inprocess/sqlite_centralized_multi_machine/src/lib.rs +++ b/fuzzers/inprocess/sqlite_centralized_multi_machine/src/lib.rs @@ -157,7 +157,7 @@ pub extern "C" fn libafl_main() { let mut secondary_run_client = |state: Option<_>, - mut mgr: CentralizedEventManager<_, _, _, _, _, _>, + mut mgr: CentralizedEventManager<_, _, _, _, _>, _client_description: ClientDescription| { // Create an observation channel using the coverage map let edges_observer = diff --git a/fuzzers/structure_aware/baby_fuzzer_grimoire/src/main.rs b/fuzzers/structure_aware/baby_fuzzer_grimoire/src/main.rs index 2fc08297e1..8129da78a3 100644 --- a/fuzzers/structure_aware/baby_fuzzer_grimoire/src/main.rs +++ b/fuzzers/structure_aware/baby_fuzzer_grimoire/src/main.rs @@ -164,7 +164,7 @@ pub fn main() { for input in initial_inputs { fuzzer - .evaluate_input(&mut state, &mut executor, &mut mgr, input) + .evaluate_input(&mut state, &mut executor, &mut mgr, &input) .unwrap(); } diff --git a/fuzzers/structure_aware/baby_fuzzer_multi/src/main.rs b/fuzzers/structure_aware/baby_fuzzer_multi/src/main.rs index 5842936ff7..0dd16eaa4c 100644 --- a/fuzzers/structure_aware/baby_fuzzer_multi/src/main.rs +++ b/fuzzers/structure_aware/baby_fuzzer_multi/src/main.rs @@ -151,7 +151,7 @@ pub fn main() { ]); fuzzer - .evaluate_input(&mut state, &mut executor, &mut mgr, initial) + .evaluate_input(&mut state, &mut executor, &mut mgr, &initial) .unwrap(); // Setup a mutational stage with a basic bytes mutator diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 0515f1c6d7..781a17d3ae 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -22,21 +22,19 @@ use libafl_bolts::{ tuples::{Handle, MatchNameRef}, ClientId, }; -use serde::{de::DeserializeOwned, Serialize}; +use serde::Serialize; -use super::AwaitRestartSafe; +use super::{AwaitRestartSafe, RecordSerializationTime}; #[cfg(feature = "llmp_compression")] use crate::events::llmp::COMPRESS_THRESHOLD; use crate::{ common::HasMetadata, events::{ serialize_observers_adaptive, std_maybe_report_progress, std_report_progress, - AdaptiveSerializer, CanSerializeObserver, Event, EventConfig, EventFirer, - EventManagerHooksTuple, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, - LogSeverity, ProgressReporter, SendExiting, + AdaptiveSerializer, CanSerializeObserver, Event, EventConfig, EventFirer, EventManagerId, + EventReceiver, EventRestarter, HasEventManagerId, LogSeverity, ProgressReporter, + SendExiting, }, - executors::HasObservers, - fuzzer::{EvaluatorObservers, ExecutionProcessor}, inputs::Input, observers::TimeObserver, state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable}, @@ -47,19 +45,18 @@ pub(crate) const _LLMP_TAG_TO_MAIN: Tag = Tag(0x3453453); /// A wrapper manager to implement a main-secondary architecture with another broker #[derive(Debug)] -pub struct CentralizedEventManager { +pub struct CentralizedEventManager { inner: EM, /// The centralized LLMP client for inter process communication client: LlmpClient, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor, time_ref: Option>, - hooks: EMH, is_main: bool, phantom: PhantomData<(I, S)>, } -impl CentralizedEventManager<(), (), (), (), (), ()> { +impl CentralizedEventManager<(), (), (), (), ()> { /// Creates a builder for [`CentralizedEventManager`] #[must_use] pub fn builder() -> CentralizedEventManagerBuilder { @@ -93,20 +90,17 @@ impl CentralizedEventManagerBuilder { } /// Creates a new [`CentralizedEventManager`]. - #[expect(clippy::type_complexity)] - pub fn build_from_client( + pub fn build_from_client( self, inner: EM, - hooks: EMH, client: LlmpClient, time_obs: Option>, - ) -> Result, Error> + ) -> Result, Error> where SP: ShMemProvider, { Ok(CentralizedEventManager { inner, - hooks, client, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), @@ -120,59 +114,66 @@ impl CentralizedEventManagerBuilder { /// /// If the port is not yet bound, it will act as a broker; otherwise, it /// will act as a client. - pub fn build_on_port( + pub fn build_on_port( self, inner: EM, - hooks: EMH, shmem_provider: SP, port: u16, time_obs: Option>, - ) -> Result, Error> + ) -> Result, Error> where SHM: ShMem, SP: ShMemProvider, { let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; - Self::build_from_client(self, inner, hooks, client, time_obs) + Self::build_from_client(self, inner, client, time_obs) } /// If a client respawns, it may reuse the existing connection, previously /// stored by [`LlmpClient::to_env()`]. - pub fn build_existing_client_from_env( + pub fn build_existing_client_from_env( self, inner: EM, - hooks: EMH, shmem_provider: SP, env_name: &str, time_obs: Option>, - ) -> Result, Error> + ) -> Result, Error> where SHM: ShMem, SP: ShMemProvider, { let client = LlmpClient::on_existing_from_env(shmem_provider, env_name)?; - Self::build_from_client(self, inner, hooks, client, time_obs) + Self::build_from_client(self, inner, client, time_obs) } /// Create an existing client from description - pub fn existing_client_from_description( + pub fn existing_client_from_description( self, inner: EM, - hooks: EMH, shmem_provider: SP, description: &LlmpClientDescription, time_obs: Option>, - ) -> Result, Error> + ) -> Result, Error> where SHM: ShMem, SP: ShMemProvider, { let client = LlmpClient::existing_client_from_description(shmem_provider, description)?; - Self::build_from_client(self, inner, hooks, client, time_obs) + Self::build_from_client(self, inner, client, time_obs) } } -impl AdaptiveSerializer for CentralizedEventManager +impl RecordSerializationTime for CentralizedEventManager +where + EM: RecordSerializationTime, +{ + /// Set the deserialization time (mut) + fn set_deserialization_time(&mut self, dur: Duration) { + self.inner.set_deserialization_time(dur); + } +} + +impl AdaptiveSerializer for CentralizedEventManager where EM: AdaptiveSerializer, { @@ -207,10 +208,9 @@ where } } -impl EventFirer for CentralizedEventManager +impl EventFirer for CentralizedEventManager where EM: HasEventManagerId + EventFirer, - EMH: EventManagerHooksTuple, S: Stoppable, I: Input, SHM: ShMem, @@ -265,7 +265,7 @@ where } } -impl EventRestarter for CentralizedEventManager +impl EventRestarter for CentralizedEventManager where EM: EventRestarter, SHM: ShMem, @@ -279,8 +279,7 @@ where } } -impl CanSerializeObserver - for CentralizedEventManager +impl CanSerializeObserver for CentralizedEventManager where EM: AdaptiveSerializer, OT: MatchNameRef + Serialize, @@ -295,7 +294,7 @@ where } } -impl SendExiting for CentralizedEventManager +impl SendExiting for CentralizedEventManager where EM: SendExiting, SHM: ShMem, @@ -305,9 +304,14 @@ where self.client.sender_mut().send_exiting()?; self.inner.send_exiting() } + + fn on_shutdown(&mut self) -> Result<(), Error> { + self.inner.on_shutdown()?; + self.client.sender_mut().send_exiting() + } } -impl AwaitRestartSafe for CentralizedEventManager +impl AwaitRestartSafe for CentralizedEventManager where SHM: ShMem, EM: AwaitRestartSafe, @@ -319,40 +323,33 @@ where } } -impl EventProcessor - for CentralizedEventManager +impl EventReceiver for CentralizedEventManager where - E: HasObservers, - E::Observers: DeserializeOwned, - EM: EventProcessor + HasEventManagerId + EventFirer, - EMH: EventManagerHooksTuple, + EM: EventReceiver + HasEventManagerId + EventFirer, I: Input, S: Stoppable, SHM: ShMem, SP: ShMemProvider, - Z: ExecutionProcessor + EvaluatorObservers, { - fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { + fn try_receive(&mut self, state: &mut S) -> Result, bool)>, Error> { if self.is_main { // main node - self.receive_from_secondary(fuzzer, state, executor) + self.receive_from_secondary(state) // self.inner.process(fuzzer, state, executor) } else { // The main node does not process incoming events from the broker ATM - self.inner.process(fuzzer, state, executor) + self.inner.try_receive(state) } } - fn on_shutdown(&mut self) -> Result<(), Error> { - self.inner.on_shutdown()?; - self.client.sender_mut().send_exiting() + fn on_interesting(&mut self, state: &mut S, event: Event) -> Result<(), Error> { + self.inner.fire(state, event) } } -impl ProgressReporter for CentralizedEventManager +impl ProgressReporter for CentralizedEventManager where EM: EventFirer + HasEventManagerId, - EMH: EventManagerHooksTuple, I: Input, S: HasExecutions + HasMetadata + HasLastReportTime + Stoppable + MaybeHasClientPerfMonitor, SHM: ShMem, @@ -371,7 +368,7 @@ where } } -impl HasEventManagerId for CentralizedEventManager +impl HasEventManagerId for CentralizedEventManager where EM: HasEventManagerId, { @@ -380,7 +377,7 @@ where } } -impl CentralizedEventManager +impl CentralizedEventManager where SHM: ShMem, SP: ShMemProvider, @@ -402,10 +399,9 @@ where } } -impl CentralizedEventManager +impl CentralizedEventManager where EM: HasEventManagerId + EventFirer, - EMH: EventManagerHooksTuple, I: Input, S: Stoppable, SHM: ShMem, @@ -438,20 +434,9 @@ where Ok(()) } - fn receive_from_secondary( - &mut self, - fuzzer: &mut Z, - state: &mut S, - executor: &mut E, - ) -> Result - where - E: HasObservers, - E::Observers: DeserializeOwned, - Z: ExecutionProcessor + EvaluatorObservers, - { + fn receive_from_secondary(&mut self, state: &mut S) -> Result, bool)>, Error> { // TODO: Get around local event copy by moving handle_in_client let self_id = self.client.sender().id(); - let mut count = 0; while let Some((client_id, tag, _flags, msg)) = self.client.recv_buf_with_flags()? { assert!( tag == _LLMP_TAG_TO_MAIN, @@ -474,116 +459,43 @@ where }; let event: Event = postcard::from_bytes(event_bytes)?; log::debug!("Processor received message {}", event.name_detailed()); - self.handle_in_main(fuzzer, executor, state, client_id, event)?; - count += 1; - } - Ok(count) - } - // Handle arriving events in the main node - fn handle_in_main( - &mut self, - fuzzer: &mut Z, - executor: &mut E, - state: &mut S, - client_id: ClientId, - event: Event, - ) -> Result<(), Error> - where - E: HasObservers, - E::Observers: DeserializeOwned, - Z: ExecutionProcessor + EvaluatorObservers, - { - log::debug!("handle_in_main!"); - - let event_name = event.name_detailed(); - - match event { - Event::NewTestcase { - input, - client_config, - exit_kind, - corpus_size, - observers_buf, - time, - forward_id, - #[cfg(feature = "multi_machine")] - node_id, - } => { - log::debug!( - "Received {} from {client_id:?} ({client_config:?}, forward {forward_id:?})", - event_name - ); - - let res = - if client_config.match_with(&self.configuration()) && observers_buf.is_some() { - let observers: E::Observers = - postcard::from_bytes(observers_buf.as_ref().unwrap())?; - log::debug!( - "[{}] Running fuzzer with event {}", - process::id(), - event_name - ); - fuzzer.evaluate_execution( - state, - self, - input.clone(), - &observers, - &exit_kind, - false, - )? - } else { - log::debug!( - "[{}] Running fuzzer with event {}", - process::id(), - event_name - ); - fuzzer.evaluate_input_with_observers( - state, - executor, - self, - input.clone(), - false, - )? - }; - - if let Some(item) = res.1 { - let event = Event::NewTestcase { - input, - client_config, - exit_kind, - corpus_size, - observers_buf, - time, - forward_id, - #[cfg(feature = "multi_machine")] - node_id, - }; - - self.hooks.on_fire_all(state, client_id, &event)?; + let event_name = event.name_detailed(); + + match event { + Event::NewTestcase { + client_config, + ref observers_buf, + forward_id, + .. + } => { + log::debug!( + "Received {} from {client_id:?} ({client_config:?}, forward {forward_id:?})", + event_name + ); log::debug!( - "[{}] Adding received Testcase {} as item #{item}...", + "[{}] Running fuzzer with event {}", process::id(), event_name ); - self.inner.fire(state, event)?; - } else { - log::debug!("[{}] {} was discarded...)", process::id(), event_name); + if client_config.match_with(&self.configuration()) && observers_buf.is_some() { + return Ok(Some((event, true))); + } + return Ok(Some((event, false))); + } + Event::Stop => { + state.request_stop(); + } + _ => { + return Err(Error::illegal_state(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))); } - } - Event::Stop => { - state.request_stop(); - } - _ => { - return Err(Error::unknown(format!( - "Received illegal message that message should not have arrived: {:?}.", - event.name() - ))); } } - - Ok(()) + Ok(None) } } diff --git a/libafl/src/events/events_hooks/mod.rs b/libafl/src/events/events_hooks/mod.rs index e7736212e5..742a05dbbb 100644 --- a/libafl/src/events/events_hooks/mod.rs +++ b/libafl/src/events/events_hooks/mod.rs @@ -1,4 +1,4 @@ -//! Hooks for event managers, especifically these are used to hook before `handle_in_client`. +//! Hooks for event managers, especifically these are used to hook before `try_receive`. //! //! This will allow user to define pre/post-processing code when the event manager receives any message from //! other clients @@ -6,59 +6,32 @@ use libafl_bolts::ClientId; use crate::{events::Event, Error}; -/// The `broker_hooks` that are run before and after the event manager calls `handle_in_client` +/// The `broker_hooks` that are run before and after the event manager calls `try_receive` pub trait EventManagerHook { - /// The hook that runs before `handle_in_client` + /// The hook that runs before `try_receive` /// Return false if you want to cancel the subsequent event handling - fn pre_exec( + fn pre_receive( &mut self, state: &mut S, client_id: ClientId, event: &Event, ) -> Result; - - /// Triggered when the even manager decides to fire the event after processing - fn on_fire( - &mut self, - _state: &mut S, - _client_id: ClientId, - _event: &Event, - ) -> Result<(), Error> { - Ok(()) - } - - /// The hook that runs after `handle_in_client` - /// Return false if you want to cancel the subsequent event handling - fn post_exec(&mut self, _state: &mut S, _client_id: ClientId) -> Result { - Ok(true) - } } -/// The tuples contains `broker_hooks` to be executed for `handle_in_client` +/// The tuples contains `broker_hooks` to be executed for `try_receive` pub trait EventManagerHooksTuple { - /// The hook that runs before `handle_in_client` - fn pre_exec_all( + /// The hook that runs before `try_receive` + fn pre_receive_all( &mut self, state: &mut S, client_id: ClientId, event: &Event, ) -> Result; - - /// Ran when the Event Manager decides to accept an event and propagates it - fn on_fire_all( - &mut self, - state: &mut S, - client_id: ClientId, - event: &Event, - ) -> Result<(), Error>; - - /// The hook that runs after `handle_in_client` - fn post_exec_all(&mut self, state: &mut S, client_id: ClientId) -> Result; } impl EventManagerHooksTuple for () { - /// The hook that runs before `handle_in_client` - fn pre_exec_all( + /// The hook that runs before `try_receive` + fn pre_receive_all( &mut self, _state: &mut S, _client_id: ClientId, @@ -66,20 +39,6 @@ impl EventManagerHooksTuple for () { ) -> Result { Ok(true) } - - fn on_fire_all( - &mut self, - _state: &mut S, - _client_id: ClientId, - _event: &Event, - ) -> Result<(), Error> { - Ok(()) - } - - /// The hook that runs after `handle_in_client` - fn post_exec_all(&mut self, _state: &mut S, _client_id: ClientId) -> Result { - Ok(true) - } } impl EventManagerHooksTuple for (Head, Tail) @@ -87,32 +46,15 @@ where Head: EventManagerHook, Tail: EventManagerHooksTuple, { - /// The hook that runs before `handle_in_client` - fn pre_exec_all( + /// The hook that runs before `try_receive` + fn pre_receive_all( &mut self, state: &mut S, client_id: ClientId, event: &Event, ) -> Result { - let first = self.0.pre_exec(state, client_id, event)?; - let second = self.1.pre_exec_all(state, client_id, event)?; - Ok(first & second) - } - - fn on_fire_all( - &mut self, - state: &mut S, - client_id: ClientId, - event: &Event, - ) -> Result<(), Error> { - self.0.on_fire(state, client_id, event)?; - self.1.on_fire_all(state, client_id, event) - } - - /// The hook that runs after `handle_in_client` - fn post_exec_all(&mut self, state: &mut S, client_id: ClientId) -> Result { - let first = self.0.post_exec(state, client_id)?; - let second = self.1.post_exec_all(state, client_id)?; + let first = self.0.pre_receive(state, client_id, event)?; + let second = self.1.pre_receive_all(state, client_id, event)?; Ok(first & second) } } diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index 3aecf7a484..4bb3238b46 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -637,7 +637,6 @@ where Option, CentralizedEventManager< StdCentralizedInnerMgr, - (), I, S, SP::ShMem, @@ -649,7 +648,6 @@ where Option, CentralizedEventManager< StdCentralizedInnerMgr, - (), I, S, SP::ShMem, @@ -696,13 +694,13 @@ where I: Input + Send + Sync + 'static, CF: FnOnce( Option, - CentralizedEventManager, + CentralizedEventManager, ClientDescription, ) -> Result<(), Error>, EMB: FnOnce(&Self, ClientDescription) -> Result<(Option, EM), Error>, MF: FnOnce( Option, - CentralizedEventManager, // No broker_hooks for centralized EM + CentralizedEventManager, // No broker_hooks for centralized EM ClientDescription, ) -> Result<(), Error>, { @@ -788,7 +786,6 @@ where let c_mgr = centralized_event_manager_builder.build_on_port( mgr, // tuple_list!(multi_machine_event_manager_hook.take().unwrap()), - tuple_list!(), self.shmem_provider.clone(), self.centralized_broker_port, self.time_obs.clone(), @@ -815,7 +812,6 @@ where let c_mgr = centralized_builder.build_on_port( mgr, - tuple_list!(), self.shmem_provider.clone(), self.centralized_broker_port, self.time_obs.clone(), diff --git a/libafl/src/events/llmp/mod.rs b/libafl/src/events/llmp/mod.rs index cee3ce1cd1..8e9d714cae 100644 --- a/libafl/src/events/llmp/mod.rs +++ b/libafl/src/events/llmp/mod.rs @@ -276,7 +276,7 @@ where state, executor, manager, - converter.convert(input)?, + &converter.convert(input)?, false, )?; @@ -298,7 +298,7 @@ where state, executor, manager, - converter.convert(input)?, + &converter.convert(input)?, false, )?; diff --git a/libafl/src/events/llmp/restarting.rs b/libafl/src/events/llmp/restarting.rs index 6694c7a616..0a05f9bbe3 100644 --- a/libafl/src/events/llmp/restarting.rs +++ b/libafl/src/events/llmp/restarting.rs @@ -37,7 +37,6 @@ use libafl_bolts::{ shmem::{ShMem, ShMemProvider, StdShMem, StdShMemProvider}, staterestore::StateRestorer, tuples::{tuple_list, Handle, MatchNameRef}, - ClientId, }; #[cfg(feature = "std")] use libafl_bolts::{ @@ -56,12 +55,11 @@ use crate::{ events::{ launcher::ClientDescription, serialize_observers_adaptive, std_maybe_report_progress, std_report_progress, AdaptiveSerializer, AwaitRestartSafe, CanSerializeObserver, Event, - EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor, - EventRestarter, HasEventManagerId, LlmpShouldSaveState, ProgressReporter, SendExiting, - StdLlmpEventHook, LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER, + EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventReceiver, + EventRestarter, HasEventManagerId, LlmpShouldSaveState, ProgressReporter, + RecordSerializationTime, SendExiting, StdLlmpEventHook, LLMP_TAG_EVENT_TO_BOTH, + _LLMP_TAG_EVENT_TO_BROKER, }, - executors::HasObservers, - fuzzer::{EvaluatorObservers, ExecutionProcessor}, inputs::Input, monitors::Monitor, observers::TimeObserver, @@ -104,6 +102,15 @@ pub struct LlmpRestartingEventManager { phantom: PhantomData<(I, S)>, } +impl RecordSerializationTime for LlmpRestartingEventManager +where + SHM: ShMem, +{ + fn set_deserialization_time(&mut self, dur: Duration) { + self.deserialization_time = dur; + } +} + impl AdaptiveSerializer for LlmpRestartingEventManager where SHM: ShMem, @@ -286,6 +293,10 @@ where // This way, the broker can clean up the pages, and eventually exit. self.llmp.sender_mut().send_exiting() } + + fn on_shutdown(&mut self) -> Result<(), Error> { + self.send_exiting() + } } impl AwaitRestartSafe for LlmpRestartingEventManager @@ -300,67 +311,94 @@ where } } -impl EventProcessor - for LlmpRestartingEventManager +impl EventReceiver for LlmpRestartingEventManager where - E: HasObservers, - E::Observers: DeserializeOwned, EMH: EventManagerHooksTuple, I: DeserializeOwned + Input, S: HasImported + HasCurrentTestcase + HasSolutions + Stoppable + Serialize, SHM: ShMem, SP: ShMemProvider, - Z: ExecutionProcessor + EvaluatorObservers, { - fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { - let res = { - // TODO: Get around local event copy by moving handle_in_client - let self_id = self.llmp.sender().id(); - let mut count = 0; - while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? { - assert_ne!( - tag, _LLMP_TAG_EVENT_TO_BROKER, - "EVENT_TO_BROKER parcel should not have arrived in the client!" - ); - - if client_id == self_id { - continue; - } + fn try_receive(&mut self, state: &mut S) -> Result, bool)>, Error> { + // TODO: Get around local event copy by moving handle_in_client + let self_id = self.llmp.sender().id(); + while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? { + assert_ne!( + tag, _LLMP_TAG_EVENT_TO_BROKER, + "EVENT_TO_BROKER parcel should not have arrived in the client!" + ); + + if client_id == self_id { + continue; + } - #[cfg(not(feature = "llmp_compression"))] - let event_bytes = msg; - #[cfg(feature = "llmp_compression")] - let compressed; - #[cfg(feature = "llmp_compression")] - let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { - compressed = self.compressor.decompress(msg)?; - &compressed - } else { - msg - }; + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = self.compressor.decompress(msg)?; + &compressed + } else { + msg + }; + + let event: Event = postcard::from_bytes(event_bytes)?; + log::debug!("Received event in normal llmp {}", event.name_detailed()); + + // If the message comes from another machine, do not + // consider other events than new testcase. + if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) { + continue; + } - let event: Event = postcard::from_bytes(event_bytes)?; - log::debug!("Received event in normal llmp {}", event.name_detailed()); + log::trace!("Got event in client: {} from {client_id:?}", event.name()); + if !self.hooks.pre_receive_all(state, client_id, &event)? { + continue; + } + let evt_name = event.name_detailed(); + match event { + Event::NewTestcase { + client_config, + ref observers_buf, + #[cfg(feature = "std")] + forward_id, + .. + } => { + #[cfg(feature = "std")] + log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id()); + + if client_config.match_with(&self.configuration) && observers_buf.is_some() { + return Ok(Some((event, true))); + } - // If the message comes from another machine, do not - // consider other events than new testcase. - if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) { - continue; + return Ok(Some((event, false))); } - self.handle_in_client(fuzzer, executor, state, client_id, event)?; - count += 1; + #[cfg(feature = "share_objectives")] + Event::Objective { .. } => { + #[cfg(feature = "std")] + log::debug!("[{}] Received new Objective", std::process::id()); + + return Ok(Some((event, false))); + } + Event::Stop => { + state.request_stop(); + } + _ => { + return Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))); + } } - count - }; - if self.staterestorer.is_some() { - self.intermediate_save()?; } - Ok(res) + Ok(None) } - fn on_shutdown(&mut self) -> Result<(), Error> { - self.send_exiting() + fn on_interesting(&mut self, _state: &mut S, _event_vec: Event) -> Result<(), Error> { + Ok(()) } } @@ -563,91 +601,6 @@ where Ok(()) } - // Handle arriving events in the client - fn handle_in_client( - &mut self, - fuzzer: &mut Z, - executor: &mut E, - state: &mut S, - client_id: ClientId, - event: Event, - ) -> Result<(), Error> - where - S: HasImported + Stoppable, - EMH: EventManagerHooksTuple, - I: Input, - E: HasObservers, - E::Observers: DeserializeOwned, - Z: ExecutionProcessor + EvaluatorObservers, - { - log::trace!("Got event in client: {} from {client_id:?}", event.name()); - if !self.hooks.pre_exec_all(state, client_id, &event)? { - return Ok(()); - } - let evt_name = event.name_detailed(); - match event { - Event::NewTestcase { - input, - client_config, - exit_kind, - observers_buf, - #[cfg(feature = "std")] - forward_id, - .. - } => { - #[cfg(feature = "std")] - log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id()); - - let res = if client_config.match_with(&self.configuration) - && observers_buf.is_some() - { - let start = current_time(); - let observers: E::Observers = - postcard::from_bytes(observers_buf.as_ref().unwrap())?; - { - self.deserialization_time = current_time() - start; - } - fuzzer.evaluate_execution(state, self, input, &observers, &exit_kind, false)? - } else { - fuzzer.evaluate_input_with_observers(state, executor, self, input, false)? - }; - if let Some(item) = res.1 { - *state.imported_mut() += 1; - log::debug!("Added received Testcase {evt_name} as item #{item}"); - } else { - log::debug!("Testcase {evt_name} was discarded"); - } - } - - #[cfg(feature = "share_objectives")] - Event::Objective { input, .. } => { - #[cfg(feature = "std")] - log::debug!("[{}] Received new Objective", std::process::id()); - - let res = - fuzzer.evaluate_input_with_observers(state, executor, self, input, false)?; - if let Some(item) = res.1 { - *state.imported_mut() += 1; - log::debug!("Added received Objective {evt_name} as item #{item}"); - } else { - log::debug!("Objective {evt_name} was discarded"); - } - } - Event::Stop => { - state.request_stop(); - } - _ => { - return Err(Error::unknown(format!( - "Received illegal message that message should not have arrived: {:?}.", - event.name() - ))); - } - } - - self.hooks.post_exec_all(state, client_id)?; - Ok(()) - } - /// Calling this function will tell the llmp broker that this client is exiting /// This should be called from the restarter not from the actual fuzzer client /// This function serves the same roll as the `LlmpClient.send_exiting()` diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index f2cd5806de..4c01ff3daa 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -585,6 +585,10 @@ pub trait SendExiting { /// Send information that this client is exiting. /// No need to restart us any longer, and no need to print an error, either. fn send_exiting(&mut self) -> Result<(), Error>; + + /// Shutdown gracefully; typically without saving state. + /// This is usually called from `fuzz_loop`. + fn on_shutdown(&mut self) -> Result<(), Error>; } /// Wait until it's safe to restart @@ -593,14 +597,15 @@ pub trait AwaitRestartSafe { fn await_restart_safe(&mut self); } -/// [`EventProcessor`] process all the incoming messages -pub trait EventProcessor { +/// [`EventReceiver`] process all the incoming messages +pub trait EventReceiver { /// Lookup for incoming events and process them. - /// Return the number of processes events or an error - fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result; + /// Return the event, if any, that needs to be evaluated + fn try_receive(&mut self, state: &mut S) -> Result, bool)>, Error>; - /// Shutdown gracefully; typically without saving state. - fn on_shutdown(&mut self) -> Result<(), Error>; + /// Run the post processing routine after the fuzzer deemed this event as interesting + /// For example, in centralized manager you wanna send this an event. + fn on_interesting(&mut self, state: &mut S, event: Event) -> Result<(), Error>; } /// The id of this `EventManager`. /// For multi processed `EventManagers`, @@ -623,6 +628,8 @@ impl NopEventManager { } } +impl RecordSerializationTime for NopEventManager {} + impl EventFirer for NopEventManager { fn should_send(&self) -> bool { true @@ -648,6 +655,10 @@ impl SendExiting for NopEventManager { fn send_exiting(&mut self) -> Result<(), Error> { Ok(()) } + + fn on_shutdown(&mut self) -> Result<(), Error> { + Ok(()) + } } impl AwaitRestartSafe for NopEventManager { @@ -655,17 +666,12 @@ impl AwaitRestartSafe for NopEventManager { fn await_restart_safe(&mut self) {} } -impl EventProcessor for NopEventManager { - fn process( - &mut self, - _fuzzer: &mut Z, - _state: &mut S, - _executor: &mut E, - ) -> Result { - Ok(0) +impl EventReceiver for NopEventManager { + fn try_receive(&mut self, _state: &mut S) -> Result, bool)>, Error> { + Ok(None) } - fn on_shutdown(&mut self) -> Result<(), Error> { + fn on_interesting(&mut self, _state: &mut S, _event_vec: Event) -> Result<(), Error> { Ok(()) } } @@ -707,6 +713,8 @@ pub struct MonitorTypedEventManager { phantom: PhantomData, } +impl RecordSerializationTime for MonitorTypedEventManager {} + impl MonitorTypedEventManager { /// Creates a new `EventManager` that wraps another manager, but captures a `monitor` type as well. #[must_use] @@ -774,6 +782,10 @@ where fn send_exiting(&mut self) -> Result<(), Error> { self.inner.send_exiting() } + + fn on_shutdown(&mut self) -> Result<(), Error> { + self.inner.on_shutdown() + } } impl AwaitRestartSafe for MonitorTypedEventManager @@ -786,17 +798,16 @@ where } } -impl EventProcessor for MonitorTypedEventManager +impl EventReceiver for MonitorTypedEventManager where - EM: EventProcessor, + EM: EventReceiver, { #[inline] - fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { - self.inner.process(fuzzer, state, executor) + fn try_receive(&mut self, state: &mut S) -> Result, bool)>, Error> { + self.inner.try_receive(state) } - - fn on_shutdown(&mut self) -> Result<(), Error> { - self.inner.on_shutdown() + fn on_interesting(&mut self, _state: &mut S, _event_vec: Event) -> Result<(), Error> { + Ok(()) } } @@ -829,6 +840,12 @@ where } } +/// Record the deserialization time for this event manager +pub trait RecordSerializationTime { + /// Set the deserialization time (mut) + fn set_deserialization_time(&mut self, _dur: Duration) {} +} + /// Collected stats to decide if observers must be serialized or not pub trait AdaptiveSerializer { /// Expose the collected observers serialization time diff --git a/libafl/src/events/simple.rs b/libafl/src/events/simple.rs index 1422187209..665b60f67c 100644 --- a/libafl/src/events/simple.rs +++ b/libafl/src/events/simple.rs @@ -22,13 +22,13 @@ use libafl_bolts::{ use serde::de::DeserializeOwned; use serde::Serialize; -use super::{std_on_restart, AwaitRestartSafe, ProgressReporter}; +use super::{std_on_restart, AwaitRestartSafe, ProgressReporter, RecordSerializationTime}; #[cfg(all(unix, feature = "std", not(miri)))] use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::{ events::{ std_maybe_report_progress, std_report_progress, BrokerEventResult, CanSerializeObserver, - Event, EventFirer, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, + Event, EventFirer, EventManagerId, EventReceiver, EventRestarter, HasEventManagerId, SendExiting, }, monitors::Monitor, @@ -71,6 +71,8 @@ where } } +impl RecordSerializationTime for SimpleEventManager {} + impl EventFirer for SimpleEventManager where I: Debug, @@ -94,6 +96,10 @@ impl SendExiting for SimpleEventManager { fn send_exiting(&mut self) -> Result<(), Error> { Ok(()) } + + fn on_shutdown(&mut self) -> Result<(), Error> { + self.send_exiting() + } } impl AwaitRestartSafe for SimpleEventManager { @@ -109,27 +115,29 @@ where } } -impl EventProcessor for SimpleEventManager +impl EventReceiver for SimpleEventManager where I: Debug, MT: Monitor, S: Stoppable, { - fn process( - &mut self, - _fuzzer: &mut Z, - state: &mut S, - _executor: &mut E, - ) -> Result { - let count = self.events.len(); + fn try_receive(&mut self, state: &mut S) -> Result, bool)>, Error> { while let Some(event) = self.events.pop() { - self.handle_in_client(state, &event)?; + match event { + Event::Stop => { + state.request_stop(); + } + _ => { + return Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {event:?}." + ))) + } + } } - Ok(count) + Ok(None) } - - fn on_shutdown(&mut self) -> Result<(), Error> { - self.send_exiting() + fn on_interesting(&mut self, _state: &mut S, _event_vec: Event) -> Result<(), Error> { + Ok(()) } } @@ -263,20 +271,6 @@ where Event::Stop => Ok(BrokerEventResult::Forward), } } - - // Handle arriving events in the client - #[allow(clippy::unused_self)] - fn handle_in_client(&mut self, state: &mut S, event: &Event) -> Result<(), Error> { - match event { - Event::Stop => { - state.request_stop(); - Ok(()) - } - _ => Err(Error::unknown(format!( - "Received illegal message that message should not have arrived: {event:?}." - ))), - } - } } /// Provides a `builder` which can be used to build a [`SimpleRestartingEventManager`]. @@ -293,6 +287,12 @@ pub struct SimpleRestartingEventManager { staterestorer: StateRestorer, } +#[cfg(feature = "std")] +impl RecordSerializationTime + for SimpleRestartingEventManager +{ +} + #[cfg(feature = "std")] impl EventFirer for SimpleRestartingEventManager where @@ -352,6 +352,10 @@ where self.staterestorer.send_exiting(); Ok(()) } + + fn on_shutdown(&mut self) -> Result<(), Error> { + self.send_exiting() + } } #[cfg(feature = "std")] @@ -362,8 +366,7 @@ impl AwaitRestartSafe for SimpleRestartingEventManager EventProcessor - for SimpleRestartingEventManager +impl EventReceiver for SimpleRestartingEventManager where I: Debug, MT: Monitor, @@ -371,12 +374,12 @@ where SHM: ShMem, SP: ShMemProvider, { - fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { - self.inner.process(fuzzer, state, executor) + fn try_receive(&mut self, state: &mut S) -> Result, bool)>, Error> { + self.inner.try_receive(state) } - fn on_shutdown(&mut self) -> Result<(), Error> { - self.send_exiting() + fn on_interesting(&mut self, _state: &mut S, _event_vec: Event) -> Result<(), Error> { + Ok(()) } } diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs index 85576f7ca5..c038ccbef3 100644 --- a/libafl/src/events/tcp.rs +++ b/libafl/src/events/tcp.rs @@ -30,7 +30,7 @@ use libafl_bolts::{ tuples::tuple_list, ClientId, }; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, sync::{broadcast, broadcast::error::RecvError, mpsc}, @@ -44,13 +44,10 @@ use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::{ events::{ std_on_restart, BrokerEventResult, Event, EventConfig, EventFirer, EventManagerHooksTuple, - EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, ProgressReporter, + EventManagerId, EventReceiver, EventRestarter, HasEventManagerId, ProgressReporter, }, - executors::{Executor, HasObservers}, - fuzzer::{EvaluatorObservers, ExecutionProcessor}, inputs::Input, monitors::Monitor, - observers::ObserversTuple, stages::HasCurrentStageId, state::{ HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions, @@ -569,75 +566,6 @@ where pub fn to_env(&self, env_name: &str) { env::set_var(env_name, format!("{}", self.client_id.0)); } - - // Handle arriving events in the client - fn handle_in_client( - &mut self, - fuzzer: &mut Z, - executor: &mut E, - state: &mut S, - client_id: ClientId, - event: Event, - ) -> Result<(), Error> - where - E: Executor + HasObservers, - E::Observers: Serialize + ObserversTuple, - for<'a> E::Observers: Deserialize<'a>, - Z: ExecutionProcessor + EvaluatorObservers, - { - if !self.hooks.pre_exec_all(state, client_id, &event)? { - return Ok(()); - } - match event { - Event::NewTestcase { - input, - client_config, - exit_kind, - observers_buf, - forward_id, - .. - } => { - log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); - - let _res = if client_config.match_with(&self.configuration) - && observers_buf.is_some() - { - let observers: E::Observers = - postcard::from_bytes(observers_buf.as_ref().unwrap())?; - fuzzer.evaluate_execution(state, self, input, &observers, &exit_kind, false)? - } else { - fuzzer.evaluate_input_with_observers(state, executor, self, input, false)? - }; - if let Some(item) = _res.1 { - *state.imported_mut() += 1; - log::info!("Added received Testcase as item #{item}"); - } - } - - #[cfg(feature = "share_objectives")] - Event::Objective { input, .. } => { - log::info!("Received new Objective"); - - let res = - fuzzer.evaluate_input_with_observers(state, executor, self, input, false)?; - if let Some(item) = res.1 { - *state.imported_mut() += 1; - log::info!("Added received Testcase as item #{item}"); - } - } - Event::Stop => { - state.request_stop(); - } - _ => { - return Err(Error::unknown(format!( - "Received illegal message that message should not have arrived: {:?}.", - event.name() - ))) - } - } - self.hooks.post_exec_all(state, client_id)?; - Ok(()) - } } impl TcpEventManager { @@ -693,11 +621,8 @@ where } } -impl EventProcessor for TcpEventManager +impl EventReceiver for TcpEventManager where - E: HasObservers + Executor, - E::Observers: Serialize + ObserversTuple, - for<'a> E::Observers: Deserialize<'a>, EMH: EventManagerHooksTuple, S: HasExecutions + HasMetadata @@ -706,16 +631,12 @@ where + HasCurrentTestcase + Stoppable, I: DeserializeOwned, - Z: ExecutionProcessor + EvaluatorObservers, { - fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { + fn try_receive(&mut self, state: &mut S) -> Result, bool)>, Error> { // TODO: Get around local event copy by moving handle_in_client let self_id = self.client_id; let mut len_buf = [0_u8; 4]; - let mut count = 0; - self.tcp.set_nonblocking(true).expect("set to non-blocking"); - // read all pending messages loop { match self.tcp.read_exact(&mut len_buf) { @@ -743,11 +664,41 @@ where // make decompressed vec and slice compatible let event = postcard::from_bytes(buf)?; - self.handle_in_client(fuzzer, executor, state, other_client_id, event)?; - count += 1; + if !self.hooks.pre_receive_all(state, other_client_id, &event)? { + continue; + } + match event { + Event::NewTestcase { + client_config, + ref observers_buf, + forward_id, + .. + } => { + log::info!("Received new Testcase from {other_client_id:?} ({client_config:?}, forward {forward_id:?})"); + if client_config.match_with(&self.configuration) + && observers_buf.is_some() + { + return Ok(Some((event, true))); + } + return Ok(Some((event, false))); + } + #[cfg(feature = "share_objectives")] + Event::Objective { .. } => { + log::info!("Received new Objective"); + return Ok(Some((event, false))); + } + Event::Stop => { + state.request_stop(); + } + _ => { + return Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))) + } + } } } - Err(e) if e.kind() == ErrorKind::WouldBlock => { // no new data on the socket break; @@ -758,12 +709,11 @@ where } } self.tcp.set_nonblocking(false).expect("set to blocking"); - - Ok(count) + Ok(None) } - fn on_shutdown(&mut self) -> Result<(), Error> { - self.send_exiting() + fn on_interesting(&mut self, _state: &mut S, _event: Event) -> Result<(), Error> { + Ok(()) } } @@ -782,6 +732,10 @@ impl SendExiting for TcpEventManager { //self.tcp.sender.send_exiting() Ok(()) } + + fn on_shutdown(&mut self) -> Result<(), Error> { + self.send_exiting() + } } impl ProgressReporter for TcpEventManager @@ -870,6 +824,10 @@ where // This way, the broker can clean up the pages, and eventually exit. self.tcp_mgr.send_exiting() } + + fn on_shutdown(&mut self) -> Result<(), Error> { + self.send_exiting() + } } impl AwaitRestartSafe for TcpRestartingEventManager @@ -908,12 +866,8 @@ where } } -impl EventProcessor - for TcpRestartingEventManager +impl EventReceiver for TcpRestartingEventManager where - E: HasObservers + Executor, I, S, Z>, - for<'a> E::Observers: Deserialize<'a>, - E::Observers: ObserversTuple + Serialize, EMH: EventManagerHooksTuple, I: DeserializeOwned, S: HasExecutions @@ -924,15 +878,13 @@ where + Stoppable, SHM: ShMem, SP: ShMemProvider, - Z: ExecutionProcessor, I, E::Observers, S> - + EvaluatorObservers, I, S>, { - fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { - self.tcp_mgr.process(fuzzer, state, executor) + fn try_receive(&mut self, state: &mut S) -> Result, bool)>, Error> { + self.tcp_mgr.try_receive(state) } - fn on_shutdown(&mut self) -> Result<(), Error> { - self.send_exiting() + fn on_interesting(&mut self, state: &mut S, event: Event) -> Result<(), Error> { + self.tcp_mgr.on_interesting(state, event) } } diff --git a/libafl/src/fuzzer/mod.rs b/libafl/src/fuzzer/mod.rs index 25cf48f63b..ab5a6b837b 100644 --- a/libafl/src/fuzzer/mod.rs +++ b/libafl/src/fuzzer/mod.rs @@ -8,14 +8,15 @@ use std::hash::Hash; #[cfg(feature = "std")] use fastbloom::BloomFilter; use libafl_bolts::{current_time, tuples::MatchName}; -use serde::Serialize; +use serde::{de::DeserializeOwned, Serialize}; #[cfg(feature = "introspection")] use crate::monitors::PerfFeature; use crate::{ corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, Testcase}, events::{ - CanSerializeObserver, Event, EventConfig, EventFirer, EventProcessor, ProgressReporter, + CanSerializeObserver, Event, EventConfig, EventFirer, EventReceiver, ProgressReporter, + RecordSerializationTime, SendExiting, }, executors::{Executor, ExitKind, HasObservers}, feedbacks::Feedback, @@ -26,8 +27,8 @@ use crate::{ stages::{HasCurrentStageId, StagesTuple}, start_timer, state::{ - HasCorpus, HasCurrentTestcase, HasExecutions, HasLastFoundTime, HasLastReportTime, - HasSolutions, MaybeHasClientPerfMonitor, Stoppable, + HasCorpus, HasCurrentTestcase, HasExecutions, HasImported, HasLastFoundTime, + HasLastReportTime, HasSolutions, MaybeHasClientPerfMonitor, Stoppable, }, Error, HasMetadata, }; @@ -98,7 +99,7 @@ pub trait ExecutionProcessor { &mut self, state: &mut S, manager: &mut EM, - input: I, + input: &I, exec_res: &ExecuteInputResult, observers: &OT, exit_kind: &ExitKind, @@ -109,7 +110,7 @@ pub trait ExecutionProcessor { &mut self, state: &mut S, manager: &mut EM, - input: I, + input: &I, exec_res: &ExecuteInputResult, obs_buf: Option>, exit_kind: &ExitKind, @@ -120,7 +121,7 @@ pub trait ExecutionProcessor { &mut self, state: &mut S, manager: &mut EM, - input: I, + input: &I, observers: &OT, exit_kind: &ExitKind, send_events: bool, @@ -137,11 +138,24 @@ pub trait EvaluatorObservers { state: &mut S, executor: &mut E, manager: &mut EM, - input: I, + input: &I, send_events: bool, ) -> Result<(ExecuteInputResult, Option), Error>; } +/// Receives and event from event manager and then evaluates it +pub trait EventProcessor { + /// Asks event manager to see if there's any event to evaluate + /// If there is any, then evaluates it. + /// After, run the post processing routines, for example, re-sending the events to the other + fn process_events( + &mut self, + state: &mut S, + executor: &mut E, + manager: &mut EM, + ) -> Result<(), Error>; +} + /// Evaluate an input modifying the state of the fuzzer pub trait Evaluator { /// Runs the input if it was (likely) not previously run and triggers observers and feedback and adds the input to the previously executed list @@ -151,7 +165,7 @@ pub trait Evaluator { state: &mut S, executor: &mut E, manager: &mut EM, - input: I, + input: &I, ) -> Result<(ExecuteInputResult, Option), Error>; /// Runs the input and triggers observers and feedback, @@ -161,7 +175,7 @@ pub trait Evaluator { state: &mut S, executor: &mut E, manager: &mut EM, - input: I, + input: &I, ) -> Result<(ExecuteInputResult, Option), Error>; /// Runs the input and triggers observers and feedback. @@ -401,7 +415,7 @@ where &mut self, state: &mut S, manager: &mut EM, - input: I, + input: &I, exec_res: &ExecuteInputResult, observers: &OT, exit_kind: &ExitKind, @@ -431,7 +445,7 @@ where &mut self, state: &mut S, manager: &mut EM, - input: I, + input: &I, exec_res: &ExecuteInputResult, observers_buf: Option>, exit_kind: &ExitKind, @@ -443,7 +457,7 @@ where manager.fire( state, Event::NewTestcase { - input, + input: input.clone(), observers_buf, exit_kind: *exit_kind, corpus_size: state.corpus().count(), @@ -462,7 +476,7 @@ where state, Event::Objective { #[cfg(feature = "share_objectives")] - input, + input: input.clone(), objective_size: state.solutions().count(), time: current_time(), @@ -479,13 +493,13 @@ where &mut self, state: &mut S, manager: &mut EM, - input: I, + input: &I, observers: &OT, exit_kind: &ExitKind, send_events: bool, ) -> Result<(ExecuteInputResult, Option), Error> { - let exec_res = self.check_results(state, manager, &input, observers, exit_kind)?; - let corpus_id = self.process_execution(state, manager, &input, &exec_res, observers)?; + let exec_res = self.check_results(state, manager, input, observers, exit_kind)?; + let corpus_id = self.process_execution(state, manager, input, &exec_res, observers)?; if send_events { self.serialize_and_dispatch(state, manager, input, &exec_res, observers, exit_kind)?; } @@ -519,13 +533,13 @@ where state: &mut S, executor: &mut E, manager: &mut EM, - input: I, + input: &I, send_events: bool, ) -> Result<(ExecuteInputResult, Option), Error> { - let exit_kind = self.execute_input(state, executor, manager, &input)?; + let exit_kind = self.execute_input(state, executor, manager, input)?; let observers = executor.observers(); - self.scheduler.on_evaluation(state, &input, &*observers)?; + self.scheduler.on_evaluation(state, input, &*observers)?; self.evaluate_execution(state, manager, input, &*observers, &exit_kind, send_events) } @@ -593,9 +607,9 @@ where state: &mut S, executor: &mut E, manager: &mut EM, - input: I, + input: &I, ) -> Result<(ExecuteInputResult, Option), Error> { - if self.input_filter.should_execute(&input) { + if self.input_filter.should_execute(input) { self.evaluate_input(state, executor, manager, input) } else { Ok((ExecuteInputResult::None, None)) @@ -609,7 +623,7 @@ where state: &mut S, executor: &mut E, manager: &mut EM, - input: I, + input: &I, ) -> Result<(ExecuteInputResult, Option), Error> { self.evaluate_input_with_observers(state, executor, manager, input, true) } @@ -724,14 +738,107 @@ where } } +impl EventProcessor for StdFuzzer +where + CS: Scheduler, + E: HasObservers + Executor, + E::Observers: DeserializeOwned + Serialize + ObserversTuple, + EM: EventReceiver + + RecordSerializationTime + + CanSerializeObserver + + EventFirer, + F: Feedback, + I: Input, + OF: Feedback, + S: HasCorpus + + HasSolutions + + HasExecutions + + HasLastFoundTime + + MaybeHasClientPerfMonitor + + HasCurrentCorpusId + + HasImported, +{ + fn process_events( + &mut self, + state: &mut S, + executor: &mut E, + manager: &mut EM, + ) -> Result<(), Error> { + // todo make this into a trait + // Execute the manager + while let Some((event, with_observers)) = manager.try_receive(state)? { + // at this point event is either newtestcase or objectives + let res = if with_observers { + match event { + Event::NewTestcase { + ref input, + ref observers_buf, + exit_kind, + .. + } => { + let start = current_time(); + let observers: E::Observers = + postcard::from_bytes(observers_buf.as_ref().unwrap())?; + { + let dur = current_time() - start; + manager.set_deserialization_time(dur); + } + let res = self.evaluate_execution( + state, manager, input, &observers, &exit_kind, false, + )?; + res.1 + } + _ => None, + } + } else { + match event { + Event::NewTestcase { ref input, .. } => { + let res = self.evaluate_input_with_observers( + state, executor, manager, input, false, + )?; + res.1 + } + #[cfg(feature = "share_objectives")] + Event::Objective { ref input, .. } => { + let res = self.evaluate_input_with_observers( + state, executor, manager, input, false, + )?; + res.1 + } + _ => None, + } + }; + if let Some(item) = res { + *state.imported_mut() += 1; + log::debug!("Added received input as item #{item}"); + + // for centralize + manager.on_interesting(state, event)?; + } else { + log::debug!("Received input was discarded"); + } + } + Ok(()) + } +} + impl Fuzzer for StdFuzzer where CS: Scheduler, - EM: ProgressReporter + EventProcessor, + E: HasObservers + Executor, + E::Observers: DeserializeOwned + Serialize + ObserversTuple, + EM: CanSerializeObserver + EventFirer + RecordSerializationTime, + I: Input, + F: Feedback, + OF: Feedback, + EM: ProgressReporter + SendExiting + EventReceiver, S: HasExecutions + HasMetadata + HasCorpus + + HasSolutions + HasLastReportTime + + HasLastFoundTime + + HasImported + HasTestcase + HasCurrentCorpusId + HasCurrentStageId @@ -774,8 +881,7 @@ where #[cfg(feature = "introspection")] state.introspection_monitor_mut().start_timer(); - // Execute the manager - manager.process(self, state, executor)?; + self.process_events(state, executor, manager)?; // Mark the elapsed time for the manager #[cfg(feature = "introspection")] @@ -951,7 +1057,7 @@ impl Default for NopFuzzer { impl Fuzzer for NopFuzzer where - EM: ProgressReporter + EventProcessor, + EM: ProgressReporter, ST: StagesTuple, S: HasMetadata + HasExecutions + HasLastReportTime + HasCurrentStageId, { @@ -1026,22 +1132,22 @@ mod tests { .unwrap(); let input = BytesInput::new(vec![1, 2, 3]); assert!(fuzzer - .evaluate_input(&mut state, &mut executor, &mut manager, input.clone()) + .evaluate_input(&mut state, &mut executor, &mut manager, &input) .is_ok()); assert_eq!(1, *execution_count.borrow()); // evaluate_input does not add it to the filter assert!(fuzzer - .evaluate_filtered(&mut state, &mut executor, &mut manager, input.clone()) + .evaluate_filtered(&mut state, &mut executor, &mut manager, &input) .is_ok()); assert_eq!(2, *execution_count.borrow()); // at to the filter assert!(fuzzer - .evaluate_filtered(&mut state, &mut executor, &mut manager, input.clone()) + .evaluate_filtered(&mut state, &mut executor, &mut manager, &input) .is_ok()); assert_eq!(2, *execution_count.borrow()); // the harness is not called assert!(fuzzer - .evaluate_input(&mut state, &mut executor, &mut manager, input.clone()) + .evaluate_input(&mut state, &mut executor, &mut manager, &input) .is_ok()); assert_eq!(3, *execution_count.borrow()); // evaluate_input ignores filters } diff --git a/libafl/src/stages/concolic.rs b/libafl/src/stages/concolic.rs index 3dde906f09..b1a04caa09 100644 --- a/libafl/src/stages/concolic.rs +++ b/libafl/src/stages/concolic.rs @@ -413,7 +413,7 @@ where for (index, new_byte) in mutation { input_copy.mutator_bytes_mut()[index] = new_byte; } - fuzzer.evaluate_filtered(state, executor, manager, input_copy)?; + fuzzer.evaluate_filtered(state, executor, manager, &input_copy)?; } } Ok(()) diff --git a/libafl/src/stages/generation.rs b/libafl/src/stages/generation.rs index bdcf3c656f..faf56656f4 100644 --- a/libafl/src/stages/generation.rs +++ b/libafl/src/stages/generation.rs @@ -37,7 +37,7 @@ where manager: &mut EM, ) -> Result<(), Error> { let input = self.0.generate(state)?; - fuzzer.evaluate_filtered(state, executor, manager, input)?; + fuzzer.evaluate_filtered(state, executor, manager, &input)?; Ok(()) } diff --git a/libafl/src/stages/mod.rs b/libafl/src/stages/mod.rs index 208b8f941b..9451a26d5f 100644 --- a/libafl/src/stages/mod.rs +++ b/libafl/src/stages/mod.rs @@ -48,7 +48,7 @@ pub use verify_timeouts::{TimeoutsToVerify, VerifyTimeoutsStage}; use crate::{ corpus::{CorpusId, HasCurrentCorpusId}, - events::EventProcessor, + events::SendExiting, state::{HasExecutions, Stoppable}, Error, HasNamedMetadata, }; @@ -161,7 +161,7 @@ where Head: Stage, Tail: StagesTuple + HasConstLen, S: HasCurrentStageId + Stoppable, - EM: EventProcessor, + EM: SendExiting, { /// Performs all stages in the tuple, /// Checks after every stage if state wants to stop @@ -248,7 +248,7 @@ impl IntoVec>> for Vec StagesTuple for Vec>> where - EM: EventProcessor, + EM: SendExiting, S: HasCurrentStageId + Stoppable, { /// Performs all stages in the `Vec` diff --git a/libafl/src/stages/mutational.rs b/libafl/src/stages/mutational.rs index 79c9f99a08..a0249e7a0f 100644 --- a/libafl/src/stages/mutational.rs +++ b/libafl/src/stages/mutational.rs @@ -279,7 +279,7 @@ where let (untransformed, post) = input.try_transform_into(state)?; let (_, corpus_id) = - fuzzer.evaluate_filtered(state, executor, manager, untransformed)?; + fuzzer.evaluate_filtered(state, executor, manager, &untransformed)?; start_timer!(state); self.mutator_mut().post_exec(state, corpus_id)?; @@ -345,7 +345,7 @@ where for new_input in generated { let (untransformed, post) = new_input.try_transform_into(state)?; let (_, corpus_id) = - fuzzer.evaluate_filtered(state, executor, manager, untransformed)?; + fuzzer.evaluate_filtered(state, executor, manager, &untransformed)?; self.mutator.multi_post_exec(state, corpus_id)?; post.post_exec(state, corpus_id)?; } diff --git a/libafl/src/stages/power.rs b/libafl/src/stages/power.rs index ea1ba45d77..7e7315ce12 100644 --- a/libafl/src/stages/power.rs +++ b/libafl/src/stages/power.rs @@ -176,7 +176,7 @@ where let (untransformed, post) = input.try_transform_into(state)?; let (_, corpus_id) = - fuzzer.evaluate_filtered(state, executor, manager, untransformed)?; + fuzzer.evaluate_filtered(state, executor, manager, &untransformed)?; start_timer!(state); self.mutator_mut().post_exec(state, corpus_id)?; diff --git a/libafl/src/stages/push/mutational.rs b/libafl/src/stages/push/mutational.rs index 3796bb2270..1418c9681e 100644 --- a/libafl/src/stages/push/mutational.rs +++ b/libafl/src/stages/push/mutational.rs @@ -161,7 +161,7 @@ where ) -> Result<(), Error> { // todo: is_interesting, etc. - fuzzer.evaluate_execution(state, event_mgr, last_input, observers, &exit_kind, true)?; + fuzzer.evaluate_execution(state, event_mgr, &last_input, observers, &exit_kind, true)?; start_timer!(state); self.mutator.post_exec(state, self.current_corpus_id)?; diff --git a/libafl/src/stages/sync.rs b/libafl/src/stages/sync.rs index 72fcd5527e..2961394583 100644 --- a/libafl/src/stages/sync.rs +++ b/libafl/src/stages/sync.rs @@ -148,7 +148,7 @@ where .left_to_sync .retain(|p| p != &path); log::debug!("Syncing and evaluating {:?}", path); - fuzzer.evaluate_input(state, executor, manager, input)?; + fuzzer.evaluate_input(state, executor, manager, &input)?; } #[cfg(feature = "introspection")] diff --git a/libafl/src/stages/tmin.rs b/libafl/src/stages/tmin.rs index 5ee86acb1f..b0756f32d2 100644 --- a/libafl/src/stages/tmin.rs +++ b/libafl/src/stages/tmin.rs @@ -238,7 +238,7 @@ where let (_, corpus_id) = fuzzer.evaluate_execution( state, manager, - input.clone(), + &input, &*observers, &exit_kind, false, diff --git a/libafl/src/stages/tuneable.rs b/libafl/src/stages/tuneable.rs index b72fbf412f..4620c35ca4 100644 --- a/libafl/src/stages/tuneable.rs +++ b/libafl/src/stages/tuneable.rs @@ -445,7 +445,7 @@ where } let (untransformed, post) = input.try_transform_into(state)?; - let (_, corpus_id) = fuzzer.evaluate_filtered(state, executor, manager, untransformed)?; + let (_, corpus_id) = fuzzer.evaluate_filtered(state, executor, manager, &untransformed)?; start_timer!(state); self.mutator_mut().post_exec(state, corpus_id)?; diff --git a/libafl/src/stages/verify_timeouts.rs b/libafl/src/stages/verify_timeouts.rs index 281e479b22..668df1f3f9 100644 --- a/libafl/src/stages/verify_timeouts.rs +++ b/libafl/src/stages/verify_timeouts.rs @@ -103,7 +103,7 @@ where executor.set_timeout(self.doubled_timeout); *self.capture_timeouts.borrow_mut() = false; while let Some(input) = timeouts.pop() { - fuzzer.evaluate_input(state, executor, manager, input)?; + fuzzer.evaluate_input(state, executor, manager, &input)?; } executor.set_timeout(self.original_timeout); *self.capture_timeouts.borrow_mut() = true; diff --git a/libafl/src/state/mod.rs b/libafl/src/state/mod.rs index 0f61bae171..2ff25744cd 100644 --- a/libafl/src/state/mod.rs +++ b/libafl/src/state/mod.rs @@ -694,7 +694,7 @@ where let _: CorpusId = fuzzer.add_input(self, executor, manager, input)?; Ok(ExecuteInputResult::Corpus) } else { - let (res, _) = fuzzer.evaluate_input(self, executor, manager, input.clone())?; + let (res, _) = fuzzer.evaluate_input(self, executor, manager, &input)?; if res == ExecuteInputResult::None { fuzzer.add_disabled_input(self, input)?; log::warn!("input {:?} was not interesting, adding as disabled.", &path); @@ -1027,7 +1027,7 @@ where let _: CorpusId = fuzzer.add_input(self, executor, manager, input)?; added += 1; } else { - let (res, _) = fuzzer.evaluate_input(self, executor, manager, input)?; + let (res, _) = fuzzer.evaluate_input(self, executor, manager, &input)?; if res != ExecuteInputResult::None { added += 1; } diff --git a/libafl_libfuzzer/runtime/src/fuzz.rs b/libafl_libfuzzer/runtime/src/fuzz.rs index 9d698afb87..8be943cf81 100644 --- a/libafl_libfuzzer/runtime/src/fuzz.rs +++ b/libafl_libfuzzer/runtime/src/fuzz.rs @@ -13,7 +13,7 @@ use std::{ use libafl::{ corpus::Corpus, events::{ - launcher::Launcher, EventConfig, EventProcessor, ProgressReporter, SimpleEventManager, + launcher::Launcher, EventConfig, EventReceiver, ProgressReporter, SimpleEventManager, SimpleRestartingEventManager, }, executors::ExitKind, @@ -68,7 +68,7 @@ where + HasLastReportTime + HasCurrentStageId + Stoppable, - EM: ProgressReporter + EventProcessor, + EM: ProgressReporter + EventReceiver, ST: StagesTuple, { if let Some(solution) = state.solutions().last() { diff --git a/libafl_libfuzzer/runtime/src/report.rs b/libafl_libfuzzer/runtime/src/report.rs index edf6e7c531..88068d6f33 100644 --- a/libafl_libfuzzer/runtime/src/report.rs +++ b/libafl_libfuzzer/runtime/src/report.rs @@ -1,7 +1,7 @@ use std::ffi::c_int; use libafl::{ - events::{EventProcessor, ProgressReporter, SimpleEventManager}, + events::{EventReceiver, ProgressReporter, SimpleEventManager}, executors::HasObservers, feedbacks::MapFeedbackMetadata, monitors::SimpleMonitor, @@ -30,7 +30,7 @@ where + HasCurrentStageId + Stoppable, E: HasObservers, - EM: ProgressReporter + EventProcessor, + EM: ProgressReporter + EventReceiver, ST: StagesTuple, { let meta = state