From 31fd36e2d7a16e44810d6baef1d38014d2034042 Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Mon, 3 Mar 2025 12:56:34 -0700 Subject: [PATCH] tool to upload displays info to GCS --- Cargo.lock | 25 + Cargo.toml | 1 + .../src/ingestion/client.rs | 6 +- crates/sui-indexer-alt-framework/src/lib.rs | 2 +- .../sui-indexer-alt-framework/src/metrics.rs | 4 +- crates/sui-upload-display/Cargo.toml | 34 ++ crates/sui-upload-display/README.md | 70 +++ crates/sui-upload-display/src/main.rs | 576 ++++++++++++++++++ crates/sui-upload-display/src/tests.rs | 158 +++++ 9 files changed, 870 insertions(+), 6 deletions(-) create mode 100644 crates/sui-upload-display/Cargo.toml create mode 100644 crates/sui-upload-display/README.md create mode 100644 crates/sui-upload-display/src/main.rs create mode 100644 crates/sui-upload-display/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 4405c591382ff..0fe486ac5f1e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15635,6 +15635,31 @@ dependencies = [ "sui-move-build", ] +[[package]] +name = "sui-upload-display" +version = "0.1.0" +dependencies = [ + "anyhow", + "bcs", + "bytes", + "clap", + "csv", + "dashmap", + "futures", + "hex", + "object_store", + "prometheus", + "regex", + "sui-indexer-alt-framework", + "sui-types", + "telemetry-subscribers", + "tempfile", + "tokio", + "tokio-util 0.7.13 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing", + "url", +] + [[package]] name = "sui-verifier-latest" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 32e79ff49f534..4dd33623b839c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -181,6 +181,7 @@ members = [ "crates/sui-transactional-test-runner", "crates/sui-types", "crates/sui-upgrade-compatibility-transactional-tests", + "crates/sui-upload-display", "crates/sui-verifier-transactional-tests", "crates/suins-indexer", "crates/suiop-cli", diff --git a/crates/sui-indexer-alt-framework/src/ingestion/client.rs b/crates/sui-indexer-alt-framework/src/ingestion/client.rs index 25fac78a162ba..71d20ccf301d1 100644 --- a/crates/sui-indexer-alt-framework/src/ingestion/client.rs +++ b/crates/sui-indexer-alt-framework/src/ingestion/client.rs @@ -24,7 +24,7 @@ use url::Url; const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60); #[async_trait::async_trait] -pub(crate) trait IngestionClientTrait: Send + Sync { +pub trait IngestionClientTrait: Send + Sync { async fn fetch(&self, checkpoint: u64) -> FetchResult; } @@ -53,7 +53,7 @@ pub struct IngestionClient { } impl IngestionClient { - pub(crate) fn new_remote(url: Url, metrics: Arc) -> IngestionResult { + pub fn new_remote(url: Url, metrics: Arc) -> IngestionResult { let client = Arc::new(RemoteIngestionClient::new(url)?); Ok(Self::new_impl(client, metrics)) } @@ -120,7 +120,7 @@ impl IngestionClient { /// [FetchError::NotFound] and [FetchError::Permanent] variants. /// /// - Cancellation of the supplied `cancel` token. - pub(crate) async fn fetch( + pub async fn fetch( &self, checkpoint: u64, cancel: &CancellationToken, diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index e463e758d9385..1b07f9c77b46c 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -28,7 +28,7 @@ use tracing::{info, warn}; pub mod handlers; pub mod ingestion; -pub(crate) mod metrics; +pub mod metrics; pub mod models; pub mod pipeline; pub mod schema; diff --git a/crates/sui-indexer-alt-framework/src/metrics.rs b/crates/sui-indexer-alt-framework/src/metrics.rs index df4df3fb48b4a..482c7604bbae8 100644 --- a/crates/sui-indexer-alt-framework/src/metrics.rs +++ b/crates/sui-indexer-alt-framework/src/metrics.rs @@ -43,7 +43,7 @@ const BATCH_SIZE_BUCKETS: &[f64] = &[ ]; #[derive(Clone)] -pub(crate) struct IndexerMetrics { +pub struct IndexerMetrics { // Statistics related to fetching data from the remote store. pub total_ingested_checkpoints: IntCounter, pub total_ingested_transactions: IntCounter, @@ -145,7 +145,7 @@ pub(crate) struct CheckpointLagMetricReporter { } impl IndexerMetrics { - pub(crate) fn new(registry: &Registry) -> Arc { + pub fn new(registry: &Registry) -> Arc { Arc::new(Self { total_ingested_checkpoints: register_int_counter_with_registry!( "indexer_total_ingested_checkpoints", diff --git a/crates/sui-upload-display/Cargo.toml b/crates/sui-upload-display/Cargo.toml new file mode 100644 index 0000000000000..87de4ce4e7294 --- /dev/null +++ b/crates/sui-upload-display/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "sui-upload-display" +version = "0.1.0" +edition = "2021" +authors = ["Sui"] +license = "Apache-2.0" +publish = false + +[dependencies] +anyhow.workspace = true +bcs.workspace = true +bytes = { workspace = true } +clap.workspace = true +csv.workspace = true +dashmap = { workspace = true } +futures.workspace = true +hex.workspace = true +object_store.workspace = true +prometheus.workspace = true +regex = { workspace = true } +sui-indexer-alt-framework.workspace = true +sui-types = { workspace = true } +telemetry-subscribers.workspace = true +tokio = { workspace = true, features = ["full", "macros"] } +tokio-util = { workspace = true, features = ["rt"] } +tracing.workspace = true +url.workspace = true + +[dev-dependencies] +tempfile = { workspace = true } + +[[bin]] +name = "sui-upload-display" +path = "src/main.rs" diff --git a/crates/sui-upload-display/README.md b/crates/sui-upload-display/README.md new file mode 100644 index 0000000000000..2b73278315531 --- /dev/null +++ b/crates/sui-upload-display/README.md @@ -0,0 +1,70 @@ +# Sui Upload Display + +A service that extracts and processes display data from Sui blockchain checkpoints and uploads it to Google Cloud Storage (GCS). + +## Features +- Processes Sui blockchain checkpoints in batches +- Extracts display data from transaction events +- Tracks and maintains state across epochs +- Uploads display data to Google Cloud Storage as CSV files +- Supports concurrent processing for improved performance + +## Usage + +To run the service with all options: + +``` +cargo run -p sui-upload-display -- \ + --gcs-cred-path="/path/to/credentials.json" \ + --gcs-display-bucket="bucket-name" \ + --remote-url="https://checkpoints.mainnet.sui.io" \ + --concurrency-limit=20 \ + --batch-size=200 +``` + +With minimal options (using defaults): + +``` +cargo run -p sui-upload-display -- \ + --gcs-cred-path="/path/to/credentials.json" \ + --gcs-display-bucket="bucket-name" +``` + +Get help on available options: + +``` +cargo run -p sui-upload-display -- --help +``` + +## Configuration Options + +### Command-line Arguments + +- `--gcs-cred-path`: Path to Google Cloud Service Account credentials JSON file +- `--gcs-display-bucket`: Name of the Google Cloud Storage bucket to upload files to +- `--remote-url`: URL of the fullnode to fetch checkpoint data from (default: "https://fullnode.mainnet.sui.io:443") +- `--concurrency-limit`: Number of concurrent checkpoints to process (default: 10) +- `--batch-size`: Number of checkpoints to process in one batch (default: 100) + +## Implementation Details + +The service works as follows: + +1. Finds the last processed checkpoint by examining existing files in the GCS bucket +2. Initializes with the epoch data from the latest checkpoint file (if any) +3. Processes batches of checkpoints in the configured batch size +4. For each checkpoint in the batch: + - Fetches checkpoint data from the Sui fullnode + - Extracts display update events + - Stores the updates in memory with their checkpoint and epoch information +5. After processing a batch, updates the in-memory epoch data with the new display entries +6. When an end-of-epoch is detected, uploads the complete display data to GCS +7. Continues to the next batch of checkpoints + +The display data is formatted in CSV files with the following columns: +- `object_type`: Type of the object (hex-encoded) +- `id`: Display ID (hex-encoded) +- `version`: Display version +- `bcs`: BCS-encoded display data (hex-encoded) + +Files are named with the format `displays_{epoch}_{checkpoint}.csv` diff --git a/crates/sui-upload-display/src/main.rs b/crates/sui-upload-display/src/main.rs new file mode 100644 index 0000000000000..deac45382f04a --- /dev/null +++ b/crates/sui-upload-display/src/main.rs @@ -0,0 +1,576 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{Context, Result}; +use clap::Parser; +use csv::Writer; +use dashmap::DashMap; +use futures::{stream, StreamExt}; +use object_store::{ + gcp::GoogleCloudStorageBuilder, path::Path, Error as ObjectStoreError, ObjectStore, +}; +use prometheus::Registry; +use regex::Regex; +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use sui_indexer_alt_framework::ingestion::client::IngestionClient; +use sui_indexer_alt_framework::metrics::IndexerMetrics; +use sui_types::display::DisplayVersionUpdatedEvent; +use sui_types::event::Event; +use telemetry_subscribers::TelemetryConfig; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; +use url::Url; + +const DEFAULT_FULLNODE_URL: &str = "https://fullnode.mainnet.sui.io:443"; + +#[cfg(test)] +mod tests; + +/// Checkpoint data with associated display entries +#[derive(Clone, Debug)] +struct CheckpointDisplayData { + epoch: u64, + display_entries: Vec, + is_end_of_epoch: bool, +} + +/// Map of checkpoint number to its display data +type DisplayUpdateMap = DashMap; + +/// Configuration for the display upload tool +#[derive(Parser, Debug, Clone)] +#[command(name = "sui-upload-display", about = "Upload display data to GCS")] +pub struct Config { + /// Path to the GCS credentials file + #[arg(long)] + gcs_cred_path: Option, + + /// Name of the GCS bucket to upload display data to + #[arg(long)] + gcs_display_bucket: Option, + + /// URL of the fullnode to fetch checkpoint data from + #[arg(long, default_value = DEFAULT_FULLNODE_URL)] + remote_url: String, + + /// Number of concurrent checkpoints to process + #[arg(long, default_value = "10")] + concurrency_limit: usize, + + /// Number of checkpoints to process in one batch + #[arg(long, default_value = "100")] + batch_size: u64, +} + +// Define a DisplayEntry struct similar to StoredDisplay +#[derive(Debug, Clone)] +struct DisplayEntry { + object_type: Vec, + display_id: Vec, + display_version: i16, + display: Vec, +} + +impl DisplayEntry { + fn try_from_event(event: &Event) -> Option { + let (type_info, display_update) = DisplayVersionUpdatedEvent::try_from_event(event)?; + let type_bytes = bcs::to_bytes(&type_info).ok()?; + Some(Self { + object_type: type_bytes, + display_id: display_update.id.bytes.to_vec(), + display_version: display_update.version as i16, + display: event.contents.clone(), + }) + } +} + +// Struct to track accumulated display data across all epochs +#[derive(Debug)] +struct AccumulatedDisplayData { + epoch: u64, + last_checkpoint: u64, + displays: BTreeMap, DisplayEntry>, +} + +impl AccumulatedDisplayData { + fn new(epoch: u64) -> Self { + Self { + epoch, + last_checkpoint: 0, + displays: BTreeMap::new(), + } + } + + fn update_displays(&mut self, new_displays: Vec) { + for display in new_displays { + self.displays.insert(display.object_type.clone(), display); + } + } +} + +/// Uploads bytes data to an object store at the specified path +async fn put( + store: &dyn ObjectStore, + path: &Path, + data: bytes::Bytes, +) -> Result<(), ObjectStoreError> { + store.put(path, data.into()).await?; + Ok(()) +} + +async fn find_last_processed_checkpoint(config: &Config) -> Result<(u64, Option<(u64, String)>)> { + let (Some(cred_path), Some(bucket)) = (&config.gcs_cred_path, &config.gcs_display_bucket) + else { + info!("GCS credentials or bucket not set, starting from checkpoint 0"); + return Ok((0, None)); + }; + + info!( + "Looking for uploaded files in GCS bucket '{}' using credentials from '{}'", + bucket, cred_path + ); + + let gcs_builder = GoogleCloudStorageBuilder::new() + .with_service_account_path(cred_path.clone()) + .with_bucket_name(bucket.clone()); + let store = gcs_builder.build().context("Failed to create GCS client")?; + let file_pattern = + Regex::new(r"displays_(\d+)_(\d+)\.csv").context("Failed to create regex pattern")?; + let mut list_stream = store.list(None); + let mut max_checkpoint = 0; + let mut max_epoch = 0; + let mut latest_file_path = None; + + while let Some(result) = list_stream.next().await { + let object = result?; + let path = object.location.to_string(); + if let Some(captures) = file_pattern.captures(&path) { + if let (Some(epoch_capture), Some(checkpoint_capture)) = + (captures.get(1), captures.get(2)) + { + if let (Ok(epoch), Ok(checkpoint)) = ( + epoch_capture.as_str().parse::(), + checkpoint_capture.as_str().parse::(), + ) { + if checkpoint > max_checkpoint { + max_checkpoint = checkpoint; + max_epoch = epoch; + latest_file_path = Some(path); + } + } + } + } + } + + let starting_checkpoint = max_checkpoint.checked_add(1).unwrap_or(max_checkpoint); + let latest_file_info = if let Some(path) = latest_file_path { + info!( + "Found file with max checkpoint {}: {}", + max_checkpoint, path + ); + Some((max_epoch, path)) + } else { + info!("No previous display files found"); + None + }; + info!("Starting from checkpoint {}", starting_checkpoint); + + Ok((starting_checkpoint, latest_file_info)) +} + +async fn load_display_entries_from_file( + config: &Config, + file_path: &str, + epoch: u64, +) -> Result { + let (Some(cred_path), Some(bucket)) = (&config.gcs_cred_path, &config.gcs_display_bucket) + else { + return Err(anyhow::Error::msg("GCS credentials or bucket not set")); + }; + + let gcs_builder = GoogleCloudStorageBuilder::new() + .with_service_account_path(cred_path.clone()) + .with_bucket_name(bucket.clone()); + let store = gcs_builder.build().context("Failed to create GCS client")?; + + let file_name = file_path.split('/').last().unwrap_or(file_path); + let path = Path::from(file_name); + + info!("Loading display entries from file: {}", file_path); + + let data = store.get(&path).await?; + let reader = data.bytes().await?; + let mut csv_reader = csv::ReaderBuilder::new() + .has_headers(true) + .from_reader(reader.as_ref()); + let mut display_data = AccumulatedDisplayData::new(epoch); + let regex = Regex::new(r"displays_\d+_(\d+)\.csv").context("Failed to create regex pattern")?; + if let Some(captures) = regex.captures(file_name) { + if let Some(checkpoint_str) = captures.get(1) { + if let Ok(checkpoint) = checkpoint_str.as_str().parse::() { + display_data.last_checkpoint = checkpoint; + } + } + } + + for result in csv_reader.records() { + let record = result.context("Failed to read CSV record")?; + if record.len() < 4 { + return Err(anyhow::anyhow!("Invalid record with fewer than 4 fields")); + } + let parse_hex = |hex: &str| -> Result> { + let hex = hex.trim_start_matches("\\x"); + hex::decode(hex).context("Failed to decode hex string") + }; + + let object_type = parse_hex(&record[0])?; + let display_id = parse_hex(&record[1])?; + let display_version = record[2] + .parse::() + .context("Failed to parse display version")?; + let display = parse_hex(&record[3])?; + + let entry = DisplayEntry { + object_type, + display_id: display_id.clone(), + display_version, + display, + }; + display_data.displays.insert(display_id, entry); + } + + info!( + "Loaded {} display entries from file", + display_data.displays.len() + ); + Ok(display_data) +} + +#[tokio::main] +async fn main() -> Result<()> { + let _guard = TelemetryConfig::new().with_env().init(); + info!("Starting sui-upload-display service"); + let config = Config::parse(); + + let remote_store: Option> = + match (&config.gcs_cred_path, &config.gcs_display_bucket) { + (Some(cred_path), Some(bucket)) => { + info!( + "Initializing GCS with bucket '{}' using credentials from '{}'", + bucket, cred_path + ); + + let gcs_builder = GoogleCloudStorageBuilder::new() + .with_service_account_path(cred_path.clone()) + .with_bucket_name(bucket.clone()); + + match gcs_builder.build() { + Ok(store) => { + info!("Successfully initialized GCS client"); + Some(Arc::new(store)) + } + Err(e) => { + warn!("Failed to initialize GCS client: {}", e); + None + } + } + } + _ => { + warn!( + "Either GCS cred path or bucket is not set, display data will not be uploaded" + ); + None + } + }; + + let registry = Registry::new(); + let metrics = IndexerMetrics::new(®istry); + + let remote_url = Url::parse(&config.remote_url).context("Failed to parse remote URL")?; + let client = IngestionClient::new_remote(remote_url, metrics)?; + info!("Initialized remote client with URL: {}", config.remote_url); + + let cancellation_token = CancellationToken::new(); + let mut accumulated_display_data = None; + + let (starting_checkpoint, latest_file_info) = find_last_processed_checkpoint(&config).await?; + if let Some((epoch, file_path)) = latest_file_info { + match load_display_entries_from_file(&config, &file_path, epoch).await { + Ok(loaded_data) => { + info!( + "Successfully loaded display data from previous epoch {} with {} entries", + epoch, + loaded_data.displays.len() + ); + accumulated_display_data = Some(loaded_data); + } + Err(e) => { + warn!("Failed to load display entries from file: {}", e); + // Continue without previous data + } + } + } + + info!( + "Service initialized. Processing checkpoints concurrently with concurrency limit {} and batch size {}", + config.concurrency_limit, + config.batch_size + ); + let display_updates = Arc::new(DisplayUpdateMap::new()); + let config = Arc::new(config); + let client = Arc::new(client); + + let mut checkpoint_processed = starting_checkpoint; + + loop { + let end_checkpoint = checkpoint_processed + config.batch_size; + let checkpoints: Vec = (checkpoint_processed..end_checkpoint).collect(); + let client_ref = client.as_ref(); + let display_updates_ref = &display_updates; + + let has_errors = Arc::new(AtomicBool::new(false)); + stream::iter(checkpoints) + .for_each_concurrent(config.concurrency_limit, |checkpoint| { + let cancel_clone = cancellation_token.clone(); + let display_updates_clone = display_updates_ref.clone(); + let has_errors_clone = has_errors.clone(); + + async move { + if let Err(e) = process_checkpoint_concurrent( + client_ref, + cancel_clone, + display_updates_clone, + checkpoint, + ) + .await + { + error!("Error processing checkpoint {}: {}", checkpoint, e); + has_errors_clone.store(true, Ordering::SeqCst); + } + } + }) + .await; + + if has_errors.load(Ordering::SeqCst) { + error!("Some checkpoints failed to process, retrying in 1 second"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + + if let Err(e) = apply_display_updates( + &config, + &display_updates, + checkpoint_processed, + end_checkpoint, + &mut accumulated_display_data, + &remote_store, + ) + .await + { + error!("Error applying display updates: {}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + for checkpoint in checkpoint_processed..end_checkpoint { + display_updates.remove(&checkpoint); + } + + checkpoint_processed = end_checkpoint; + info!( + "Successfully completed batch ending at checkpoint {}", + end_checkpoint - 1 + ); + } +} + +async fn process_checkpoint_concurrent( + client: &IngestionClient, + cancel: CancellationToken, + display_updates: Arc, + checkpoint: u64, +) -> Result<()> { + let checkpoint_data = match client.fetch(checkpoint, &cancel).await { + Ok(data) => data, + Err(e) => { + error!("Failed to fetch checkpoint {}: {}", checkpoint, e); + display_updates.insert( + checkpoint, + CheckpointDisplayData { + epoch: 0, + display_entries: Vec::new(), + is_end_of_epoch: false, + }, + ); + return Err(e.into()); + } + }; + + let summary = &checkpoint_data.checkpoint_summary; + let epoch = summary.epoch; + let new_displays: Vec = checkpoint_data + .transactions + .iter() + .filter_map(|tx| tx.events.as_ref()) + .flat_map(|events| events.data.iter()) + .filter_map(DisplayEntry::try_from_event) + .collect(); + + if !new_displays.is_empty() { + info!( + "Found {} display updates in checkpoint {} of epoch {}", + new_displays.len(), + checkpoint, + epoch + ); + } + + let is_end_of_epoch = summary.end_of_epoch_data.is_some(); + display_updates.insert( + checkpoint, + CheckpointDisplayData { + epoch, + display_entries: new_displays, + is_end_of_epoch, + }, + ); + Ok(()) +} + +async fn apply_display_updates( + config: &Arc, + display_updates: &Arc, + start_checkpoint: u64, + end_checkpoint: u64, + display_data: &mut Option, + remote_store: &Option>, +) -> Result<()> { + let mut end_of_epoch_detected = false; + let mut last_checkpoint = 0; + + for checkpoint in start_checkpoint..end_checkpoint { + if let Some(entry) = display_updates.get(&checkpoint) { + let CheckpointDisplayData { + epoch, + display_entries, + is_end_of_epoch, + } = entry.clone(); + + if display_data.as_ref().is_none_or(|data| data.epoch != epoch) { + if end_of_epoch_detected && display_data.is_some() { + let data = display_data + .as_ref() + .expect("Display data should exist based on the previous check"); + + info!( + "End of epoch {} detected at checkpoint {}, uploading display data", + data.epoch, last_checkpoint + ); + upload_display_data( + data.epoch, + last_checkpoint, + data.displays.values().cloned().collect(), + config, + remote_store, + ) + .await?; + } + + info!("Starting new epoch {}", epoch); + *display_data = Some(AccumulatedDisplayData::new(epoch)); + end_of_epoch_detected = false; + } + let epoch_data = display_data + .as_mut() + .expect("Display data should exist at this point"); + + epoch_data.last_checkpoint = checkpoint; + last_checkpoint = checkpoint; + + if !display_entries.is_empty() { + epoch_data.update_displays(display_entries); + } + + if is_end_of_epoch { + end_of_epoch_detected = true; + } + } + } + + if end_of_epoch_detected && display_data.is_some() { + let data = display_data + .as_ref() + .expect("Display data should exist based on the previous check"); + + info!( + "End of epoch {} detected at checkpoint {}, uploading display data", + data.epoch, last_checkpoint + ); + upload_display_data( + data.epoch, + last_checkpoint, + data.displays.values().cloned().collect(), + config, + remote_store, + ) + .await?; + } + + Ok(()) +} + +async fn upload_display_data( + epoch: u64, + checkpoint: u64, + displays: Vec, + _config: &Arc, + remote_store: &Option>, +) -> anyhow::Result<()> { + match displays.len() { + 0 => info!( + "No display updates for epoch {}, but still uploading empty file", + epoch + ), + count => info!("Uploading {} display entries for epoch {}", count, epoch), + } + + let filename = format!("displays_{}_{}.csv", epoch, checkpoint); + let buffer = { + let mut buffer = Vec::new(); + { + let mut writer = Writer::from_writer(&mut buffer); + writer + .write_record(["object_type", "id", "version", "bcs"]) + .context("Failed to write CSV header")?; + for display in &displays { + let record = [ + format!("\\x{}", hex::encode(&display.object_type)), + format!("\\x{}", hex::encode(&display.display_id)), + display.display_version.to_string(), + format!("\\x{}", hex::encode(&display.display)), + ]; + + writer + .write_record(record) + .context("Failed to write display entry to CSV")?; + } + + writer.flush().context("Failed to flush CSV writer")?; + } + buffer + }; + + if let Some(store) = remote_store { + let filename_clone = filename.clone(); // Clone the filename + let path = Path::from(filename); + info!("Uploading {} entries to {}", displays.len(), filename_clone); + let bytes_data = bytes::Bytes::from(buffer); + put(store, &path, bytes_data).await?; + } else { + warn!("GCS not configured, skipping upload"); + } + + Ok(()) +} diff --git a/crates/sui-upload-display/src/tests.rs b/crates/sui-upload-display/src/tests.rs new file mode 100644 index 0000000000000..8237f53c328a9 --- /dev/null +++ b/crates/sui-upload-display/src/tests.rs @@ -0,0 +1,158 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{AccumulatedDisplayData, DisplayEntry}; +use anyhow::Result; +use std::path::PathBuf; +use tempfile::tempdir; + +fn create_dummy_display_entry(idx: u8) -> DisplayEntry { + DisplayEntry { + object_type: vec![0x01, 0x02, 0x03, idx], + display_id: vec![0x04, 0x05, 0x06, idx], + display_version: 1, + display: vec![0x07, 0x08, 0x09, idx], + } +} + +fn create_dummy_entries(count: u8) -> Vec { + (0..count).map(create_dummy_display_entry).collect() +} + +fn write_entries_to_csv(entries: &[DisplayEntry], path: &PathBuf) -> Result<()> { + let mut writer = csv::WriterBuilder::new() + .has_headers(true) + .from_path(path)?; + + writer.write_record(["object_type", "display_id", "display_version", "display"])?; + + for entry in entries { + let object_type_hex = hex::encode(&entry.object_type); + let display_id_hex = hex::encode(&entry.display_id); + let display_hex = hex::encode(&entry.display); + + writer.write_record([ + &format!("\\x{}", object_type_hex), + &format!("\\x{}", display_id_hex), + &entry.display_version.to_string(), + &format!("\\x{}", display_hex), + ])?; + } + + writer.flush()?; + Ok(()) +} + +fn extract_entries(data: &AccumulatedDisplayData) -> Vec { + data.displays.values().cloned().collect() +} + +fn compare_entries(a: &[DisplayEntry], b: &[DisplayEntry]) -> bool { + if a.len() != b.len() { + return false; + } + + let mut matched_b = vec![false; b.len()]; + for entry_a in a { + let mut found_match = false; + + for (idx_b, entry_b) in b.iter().enumerate() { + if matched_b[idx_b] { + continue; // Skip already matched entries + } + + if entry_a.object_type == entry_b.object_type + && entry_a.display_id == entry_b.display_id + && entry_a.display_version == entry_b.display_version + && entry_a.display == entry_b.display + { + matched_b[idx_b] = true; + found_match = true; + break; + } + } + + if !found_match { + return false; + } + } + + true +} + +struct TestSetup { + temp_dir: tempfile::TempDir, + file_path: String, +} + +impl TestSetup { + async fn new(entries: &[DisplayEntry]) -> Result { + let temp_dir = tempdir()?; + let file_name = "displays_10_233333.csv"; + let csv_path = temp_dir.path().join(file_name); + write_entries_to_csv(entries, &csv_path)?; + Ok(Self { + temp_dir, + file_path: file_name.to_string(), + }) + } + + fn temp_dir_path(&self) -> &std::path::Path { + self.temp_dir.path() + } +} + +#[tokio::test] +async fn test_load_display_entries_from_csv() -> Result<()> { + let dummy_entries = create_dummy_entries(5); + let setup = TestSetup::new(&dummy_entries).await?; + let file_path = format!("{}/{}", setup.temp_dir_path().display(), setup.file_path); + + let mut loaded_entries = Vec::new(); + let file_content = std::fs::read_to_string(&file_path)?; + let mut csv_reader = csv::ReaderBuilder::new() + .has_headers(true) + .from_reader(file_content.as_bytes()); + + for result in csv_reader.records() { + let record = result?; + + if record.len() < 4 { + continue; + } + + let parse_hex = |hex: &str| -> Result> { + let hex = hex.trim_start_matches("\\x"); + Ok(hex::decode(hex)?) + }; + + let object_type = parse_hex(&record[0])?; + let display_id = parse_hex(&record[1])?; + let display_version = record[2].parse::()?; + let display = parse_hex(&record[3])?; + + let entry = DisplayEntry { + object_type, + display_id, + display_version, + display, + }; + + loaded_entries.push(entry); + } + + assert!( + compare_entries(&dummy_entries, &loaded_entries), + "Manually loaded entries don't match original entries" + ); + let mut epoch_data = AccumulatedDisplayData::new(10); + epoch_data.update_displays(loaded_entries); + let extracted_entries = extract_entries(&epoch_data); + assert!( + compare_entries(&dummy_entries, &extracted_entries), + "Entries in AccumulatedDisplayData don't match original entries" + ); + setup.temp_dir.close()?; + + Ok(()) +}