Skip to content

Commit

Permalink
[json rpc] query events based on transaction digest
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Nov 26, 2024
1 parent b023ef8 commit 63b5075
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 127 deletions.
25 changes: 25 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5384,6 +5384,31 @@ impl TransactionKeyValueStoreTrait for AuthorityState {
.map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
.collect())
}

#[instrument(skip(self))]
async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
if digests.is_empty() {
return Ok(vec![]);
}
let events_digests: Vec<_> = self
.get_transaction_cache_reader()
.multi_get_executed_effects(digests)?
.into_iter()
.flat_map(|t| t.map(|t| t.events_digest().cloned()))
.collect();
let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
let mut events = self
.get_transaction_cache_reader()
.multi_get_events(&non_empty_events)?
.into_iter();
Ok(events_digests
.into_iter()
.flat_map(|ev| ev.map(|_| events.next()?))
.collect())
}
}

#[cfg(msim)]
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-json-rpc/src/coin_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod tests {
use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
use sui_types::coin::TreasuryCap;
use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEventsDigest};
use sui_types::effects::TransactionEffects;
use sui_types::effects::{TransactionEffects, TransactionEvents};
use sui_types::error::{SuiError, SuiResult};
use sui_types::gas_coin::GAS;
use sui_types::id::UID;
Expand Down Expand Up @@ -479,6 +479,8 @@ mod tests {
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;

async fn multi_get_events_by_tx_digests(&self,digests: &[TransactionDigest]) -> SuiResult<Vec<Option<TransactionEvents>>>;
}
}

Expand Down
202 changes: 76 additions & 126 deletions crates/sui-json-rpc/src/read_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use sui_storage::key_value_store::TransactionKeyValueStore;
use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest};
use sui_types::collection_types::VecMap;
use sui_types::crypto::AggregateAuthoritySignature;
use sui_types::digests::TransactionEventsDigest;
use sui_types::display::DisplayVersionUpdatedEvent;
use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
use sui_types::error::{SuiError, SuiObjectResponseError};
Expand Down Expand Up @@ -320,93 +319,57 @@ impl ReadApi {

if opts.show_events {
trace!("getting events");
let events_digests_list = temp_response
.values()
.filter_map(|cache_entry| match &cache_entry.effects {
Some(eff) => eff.events_digest().cloned(),
None => None,
})
.collect::<Vec<TransactionEventsDigest>>();
// filter out empty events digest, as they do not have to be read from the DB
let empty_events_digest = TransactionEvents::default().digest();
let events_digests_list = events_digests_list
.into_iter()
.filter(|d| d != &empty_events_digest)
.collect::<Vec<_>>();

let mut events_digest_to_events = if events_digests_list.is_empty() {
HashMap::new()
} else {
// fetch events from the DB with retry, retry each 0.5s for 3s
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(3)),
multiplier: 1.0,
..ExponentialBackoff::default()
};
let events = retry(backoff, || async {
match self
.transaction_kv_store
.multi_get_events(&events_digests_list)
.await
{
// Only return Ok when all the queried transaction events are found, otherwise retry
// until timeout, then return Err.
Ok(events) if !events.contains(&None) => Ok(events),
Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
"Events not found, transaction execution may be incomplete.".into(),
))),
Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
"Failed to call multi_get_events: {e:?}"
)))),
}
})
.await
.map_err(|e| {
Error::UnexpectedError(format!(
"Retrieving events with retry failed for events digests {events_digests_list:?}: {e:?}"
))
})?
.into_iter();

// construct a hashmap of events digests -> events for fast lookup
let events_map = events_digests_list
.into_iter()
.zip(events)
.collect::<HashMap<_, _>>();
// Double check that all events are `Some` and their digests match the key
for (events_digest, events) in events_map.iter() {
if let Some(events) = events {
if &events.digest() != events_digest {
return Err(Error::UnexpectedError(format!(
"Events digest {events_digest:?} does not match the key {:?}",
events.digest()
)));
}
} else {
return Err(Error::UnexpectedError(format!(
"Events of digest {events_digest:?} is None, but it should not be"
)));
let mut non_empty_digests = vec![];
for cache_entry in temp_response.values() {
if let Some(effects) = &cache_entry.effects {
if effects.events_digest().is_some() {
non_empty_digests.push(cache_entry.digest);
}
}
events_map
}
// fetch events from the DB with retry, retry each 0.5s for 3s
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(3)),
multiplier: 1.0,
..ExponentialBackoff::default()
};
events_digest_to_events.insert(empty_events_digest, Some(TransactionEvents::default()));
let mut events = retry(backoff, || async {
match self
.transaction_kv_store
.multi_get_events_by_tx_digests(&non_empty_digests)
.await
{
// Only return Ok when all the queried transaction events are found, otherwise retry
// until timeout, then return Err.
Ok(events) if !events.contains(&None) => Ok(events),
Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
"Events not found, transaction execution may be incomplete.".into(),
))),
Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
"Failed to call multi_get_events: {e:?}"
)))),
}
})
.await
.map_err(|e| {
Error::UnexpectedError(format!(
"Retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
))
})?
.into_iter();

// fill cache with the events
for (_, cache_entry) in temp_response.iter_mut() {
let transaction_digest = cache_entry.digest;
if let Some(events_digest) =
cache_entry.effects.as_ref().and_then(|e| e.events_digest())
{
let events = events_digest_to_events
.get(events_digest)
.cloned()
.unwrap_or_else(|| panic!("Expect event digest {events_digest:?} to be found in cache for transaction {transaction_digest}"))
.map(|events| to_sui_transaction_events(self, cache_entry.digest, events));
match events {
Some(Ok(e)) => cache_entry.events = Some(e),
Some(Err(e)) => cache_entry.errors.push(e.to_string()),
None => {
match events.next() {
Some(Some(ev)) => {
cache_entry.events =
Some(to_sui_transaction_events(self, cache_entry.digest, ev)?)
}
None | Some(None) => {
error!("Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}");
cache_entry.errors.push(format!(
"Failed to fetch events with event digest {events_digest:?}",
Expand Down Expand Up @@ -836,30 +799,26 @@ impl ReadApiServer for ReadApi {
}

if opts.show_events && temp_response.effects.is_some() {
// safe to unwrap because we have checked is_some
if let Some(event_digest) = temp_response.effects.as_ref().unwrap().events_digest()
{
let transaction_kv_store = self.transaction_kv_store.clone();
let event_digest = *event_digest;
let events = spawn_monitored_task!(async move {
transaction_kv_store
.get_events(event_digest)
.await
.map_err(|e| {
error!("Failed to call get transaction events for events digest: {event_digest:?} with error {e:?}");
Error::from(e)
})
})
let transaction_kv_store = self.transaction_kv_store.clone();
let events = spawn_monitored_task!(async move {
transaction_kv_store
.multi_get_events_by_tx_digests(&[digest])
.await
.map_err(Error::from)??;
match to_sui_transaction_events(self, digest, events) {
.map_err(|e| {
error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}");
Error::from(e)
})
})
.await
.map_err(Error::from)??
.pop()
.flatten();
match events {
None => temp_response.events = Some(SuiTransactionBlockEvents::default()),
Some(events) => match to_sui_transaction_events(self, digest, events) {
Ok(e) => temp_response.events = Some(e),
Err(e) => temp_response.errors.push(e.to_string()),
};
} else {
// events field will be Some if and only if `show_events` is true and
// there is no error in converting fetching events
temp_response.events = Some(SuiTransactionBlockEvents::default());
},
}
}

Expand Down Expand Up @@ -941,38 +900,29 @@ impl ReadApiServer for ReadApi {
let transaction_kv_store = self.transaction_kv_store.clone();
spawn_monitored_task!(async move{
let store = state.load_epoch_store_one_call_per_task();
let effect = transaction_kv_store
.get_fx_by_tx_digest(transaction_digest)
.await
.map_err(Error::from)?;
let events = if let Some(event_digest) = effect.events_digest() {
transaction_kv_store
.get_events(*event_digest)
let events = transaction_kv_store
.multi_get_events_by_tx_digests(&[transaction_digest])
.await
.map_err(
|e| {
error!("Failed to get transaction events for event digest {event_digest:?} with error: {e:?}");
error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
Error::StateReadError(e.into())
})?
.data
.into_iter()
.enumerate()
.map(|(seq, e)| {
let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
SuiEvent::try_from(
e,
*effect.transaction_digest(),
seq as u64,
None,
layout,
)
})
.collect::<Result<Vec<_>, _>>()
.map_err(Error::SuiError)?
} else {
vec![]
};
Ok(events)
.pop()
.flatten();
Ok(match events {
Some(events) => events
.data
.into_iter()
.enumerate()
.map(|(seq, e)| {
let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout)
})
.collect::<Result<Vec<_>, _>>()
.map_err(Error::SuiError)?,
None => vec![],
})
}).await.map_err(Error::from)?
})
}
Expand Down
24 changes: 24 additions & 0 deletions crates/sui-storage/src/http_key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub enum Key {
CheckpointSummaryByDigest(CheckpointDigest),
TxToCheckpoint(TransactionDigest),
ObjectKey(ObjectID, VersionNumber),
EventsByTxDigest(TransactionDigest),
}

impl Key {
Expand All @@ -99,6 +100,7 @@ impl Key {
Key::CheckpointSummaryByDigest(_) => "cs",
Key::TxToCheckpoint(_) => "tx2c",
Key::ObjectKey(_, _) => "ob",
Key::EventsByTxDigest(_) => "evtx",
}
}

Expand All @@ -117,6 +119,7 @@ impl Key {
Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
Key::TxToCheckpoint(digest) => encode_digest(digest),
Key::ObjectKey(object_id, version) => encode_object_key(object_id, version),
Key::EventsByTxDigest(digest) => encode_digest(digest),
}
}

Expand Down Expand Up @@ -569,4 +572,25 @@ impl TransactionKeyValueStoreTrait for HttpKVStore {

Ok(results)
}

#[instrument(level = "trace", skip_all)]
async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
let keys = digests
.iter()
.map(|digest| Key::EventsByTxDigest(*digest))
.collect::<Vec<_>>();
Ok(self
.multi_fetch(keys)
.await
.iter()
.zip(digests.iter())
.map(map_fetch)
.map(|maybe_bytes| {
maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
})
.collect::<Vec<_>>())
}
}
30 changes: 30 additions & 0 deletions crates/sui-storage/src/key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ impl TransactionKeyValueStore {
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
self.inner.multi_get_transaction_checkpoint(digests).await
}

pub async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
self.inner.multi_get_events_by_tx_digests(digests).await
}
}

/// Immutable key/value store trait for storing/retrieving transactions, effects, and events.
Expand Down Expand Up @@ -454,6 +461,11 @@ pub trait TransactionKeyValueStoreTrait {
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;

async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>>;
}

/// A TransactionKeyValueStoreTrait that falls back to a secondary store for any key for which the
Expand Down Expand Up @@ -630,6 +642,24 @@ impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {

Ok(res)
}

#[instrument(level = "trace", skip_all)]
async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
let (fallback, indices) = find_fallback(&res, digests);
if fallback.is_empty() {
return Ok(res);
}
let secondary_res = self
.fallback
.multi_get_events_by_tx_digests(&fallback)
.await?;
merge_res(&mut res, secondary_res, &indices);
Ok(res)
}
}

fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
Expand Down
Loading

0 comments on commit 63b5075

Please sign in to comment.