Skip to content

Commit

Permalink
feat: add state to solana-listener (#24)
Browse files Browse the repository at this point in the history
* feat: persist last processed signature

* feat: add solana-listener env var to force last processed signature

* feat: move last signature env var gathering to upper level

* chore: remove comment

* refactor: move last signature check logic into upper layers

* fix: env var err check

* fix: force latest signature env var test

* Fix typo
  • Loading branch information
eloylp authored Feb 6, 2025
1 parent a81d73c commit 4b60662
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 135 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/file-based-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ amplifier-api.workspace = true
memmap2.workspace = true
tracing.workspace = true
bytemuck.workspace = true
solana-sdk.workspace = true

[lints]
workspace = true
80 changes: 79 additions & 1 deletion crates/file-based-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::{Arc, Mutex};
use amplifier_api::types::{uuid, TaskItemId};
use bytemuck::{Pod, Zeroable};
use memmap2::MmapMut;
use solana_sdk::signature::Signature;

/// Memory map wrapper that implements the state to successfully store and retrieve latest task item
/// id
Expand All @@ -18,10 +19,22 @@ pub struct MemmapState {
}

#[repr(C)]
#[derive(Default, Debug, Copy, Clone, Pod, Zeroable)]
#[derive(Debug, Copy, Clone, Pod, Zeroable)]
#[expect(clippy::struct_field_names)]
struct InternalState {
latest_queried_task_item_id: u128,
latest_processed_task_item_id: u128,
latest_processed_signature: [u8; 64],
}

impl Default for InternalState {
fn default() -> Self {
Self {
latest_queried_task_item_id: 0,
latest_processed_task_item_id: 0,
latest_processed_signature: [0_u8; 64],
}
}
}

#[expect(
Expand Down Expand Up @@ -87,6 +100,37 @@ impl MemmapState {
}
}

// Generic helper function for setting a Signature
fn set_signature<F>(&self, signature: Signature, field_mutator: F) -> Result<(), io::Error>
where
F: Fn(&mut InternalState, [u8; 64]),
{
let mut mmap = self.mmap.lock().expect("lock should not be poisoned");
let signature_bytes = signature.into();
let data = bytemuck::from_bytes_mut::<InternalState>(&mut mmap[..]);
field_mutator(data, signature_bytes);
mmap.flush()?;
drop(mmap);
Ok(())
}

// Generic helper function for getting a Signature
fn get_signature<F>(&self, field_accessor: F) -> Option<Signature>
where
F: Fn(&InternalState) -> [u8; 64],
{
let mmap = self.mmap.lock().expect("lock should not be poisoned");
let data = bytemuck::from_bytes::<InternalState>(&mmap[..]);
let signature = field_accessor(data);
drop(mmap);

if signature == [0_u8; 64] {
None
} else {
Some(Signature::from(signature))
}
}

// Generic helper function for setting a TaskItemId
fn set_task_item_id<F>(
&self,
Expand Down Expand Up @@ -137,3 +181,37 @@ impl relayer_amplifier_state::State for MemmapState {
})
}
}

impl SolanaListenerState for MemmapState {
type Err = io::Error;

fn set_latest_processed_signature(&self, signature: Signature) -> Result<(), Self::Err> {
tracing::trace!("updating latest processed signature");
self.set_signature(signature, |data, value| {
data.latest_processed_signature = value;
})
}

fn latest_processed_signature(&self) -> Option<Signature> {
tracing::trace!("getting latest processed signature");
self.get_signature(|data| data.latest_processed_signature)
}
}

/// Trait for the state of the Solana listener
pub trait SolanaListenerState: Clone + Send + Sync + 'static {
/// The error type for the state
type Err: core::error::Error + Send + Sync + 'static;

/// Get the latest processed signature
/// # Errors
///
/// The underlying storage error
fn latest_processed_signature(&self) -> Option<Signature>;

/// Set the latest processed signature
/// # Errors
///
/// The underlying storage error
fn set_latest_processed_signature(&self, signature: Signature) -> Result<(), Self::Err>;
}
6 changes: 2 additions & 4 deletions crates/solana-axelar-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ async fn main() {
Arc::clone(&rpc_client),
amplifier_task_receiver,
amplifier_client.clone(),
file_based_storage,
file_based_storage.clone(),
);
let (solana_listener_component, solana_listener_client) = solana_listener::SolanaListener::new(
config.solana_listener_component,
Arc::clone(&rpc_client),
file_based_storage,
);
let solana_event_forwarder_component = solana_event_forwarder::SolanaEventForwarder::new(
event_forwarder_config,
Expand Down Expand Up @@ -117,7 +118,6 @@ mod tests {
use solana_listener::solana_sdk::commitment_config::CommitmentConfig;
use solana_listener::solana_sdk::pubkey::Pubkey;
use solana_listener::solana_sdk::signature::{Keypair, Signature};
use solana_listener::MissedSignatureCatchupStrategy;

use crate::Config;

Expand Down Expand Up @@ -199,8 +199,6 @@ mod tests {
gas_service_config_pda,
tx_scan_poll_period: solana_tx_scan_poll_period,
solana_ws,
missed_signature_catchup_strategy: MissedSignatureCatchupStrategy::UntilBeginning,
latest_processed_signature: Some(Signature::from_str(&latest_processed_signature)?),
commitment: CommitmentConfig::finalized(),
},
solana_gateway_task_processor: solana_gateway_task_processor::Config {
Expand Down
2 changes: 2 additions & 0 deletions crates/solana-listener/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ common-serde-utils.workspace = true
core-common-serde-utils.workspace = true
solana-client.workspace = true
solana-sdk.workspace = true
file-based-storage.workspace = true
solana-transaction-status.workspace = true
axelar-solana-gas-service.workspace = true
solana-rpc-client-api.workspace = true
Expand All @@ -37,6 +38,7 @@ solana-rpc.workspace = true
test-log.workspace = true
serial_test.workspace = true
pretty_assertions.workspace = true
uuid.workspace = true

[lints]
workspace = true
Loading

0 comments on commit 4b60662

Please sign in to comment.