From 459eba82bcadcf9ce29f70da776889cd36373692 Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Thu, 28 Mar 2024 16:35:56 +0100 Subject: [PATCH 1/3] sink: add option `handle_replace` method to sink --- sinks/sink-common/src/connector/default.rs | 13 ++++---- sinks/sink-common/src/connector/sink.rs | 35 ++++++++++++++++++---- sinks/sink-common/src/sink.rs | 9 ++++++ 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/sinks/sink-common/src/connector/default.rs b/sinks/sink-common/src/connector/default.rs index cfe6f972..71e94d25 100644 --- a/sinks/sink-common/src/connector/default.rs +++ b/sinks/sink-common/src/connector/default.rs @@ -176,12 +176,6 @@ where state: &mut PersistedState, ct: CancellationToken, ) -> Result<(CursorAction, StreamAction), SinkError> { - if self.needs_invalidation { - self.handle_invalidate(context.cursor.clone(), state, ct.clone()) - .await?; - self.needs_invalidation = false; - } - // fatal error since if the sink is restarted it will receive the same data again. let json_batch = batch .into_iter() @@ -206,7 +200,12 @@ where } } - let mut action = self.sink.handle_data(&context, &data, ct).await?; + let mut action = if self.needs_invalidation { + self.needs_invalidation = false; + self.sink.handle_replace(&context, &data, ct).await? + } else { + self.sink.handle_data(&context, &data, ct).await? + }; // If it's pending, don't store the cursor. if context.finality.is_pending() { diff --git a/sinks/sink-common/src/connector/sink.rs b/sinks/sink-common/src/connector/sink.rs index 766ae529..00365f45 100644 --- a/sinks/sink-common/src/connector/sink.rs +++ b/sinks/sink-common/src/connector/sink.rs @@ -27,25 +27,20 @@ impl SinkWithBackoff { batch: &Value, ct: CancellationToken, ) -> Result { - // info!("handling data with backoff: {:?}", &self.backoff); for duration in &self.backoff { - // info!("trying to handle data, duration: {:?}", duration); match self.inner.handle_data(ctx, batch).await { Ok(action) => return Ok(action), Err(err) => { warn!(err = ?err, "failed to handle data"); if ct.is_cancelled() { - // info!("cancelled while handling data"); return Err(err) .change_context(SinkError::Fatal) .attach_printable("failed to handle data (cancelled)"); } tokio::select! { _ = tokio::time::sleep(duration) => { - // info!("retrying to handle data after sleeping"); }, _ = ct.cancelled() => { - // info!("cancelled while retrying to handle data"); return Ok(CursorAction::Skip); } }; @@ -56,6 +51,36 @@ impl SinkWithBackoff { Err(SinkError::Fatal).attach_printable("handle data failed after retry") } + pub async fn handle_replace( + &mut self, + ctx: &Context, + batch: &Value, + ct: CancellationToken, + ) -> Result { + for duration in &self.backoff { + match self.inner.handle_replace(ctx, batch).await { + Ok(action) => return Ok(action), + Err(err) => { + warn!(err = ?err, "failed to handle data"); + if ct.is_cancelled() { + return Err(err) + .change_context(SinkError::Fatal) + .attach_printable("failed to handle replace data (cancelled)"); + } + tokio::select! { + _ = tokio::time::sleep(duration) => { + }, + _ = ct.cancelled() => { + return Ok(CursorAction::Skip); + } + }; + } + } + } + + Err(SinkError::Fatal).attach_printable("handle replace data failed after retry") + } + pub async fn handle_invalidate( &mut self, cursor: &Option, diff --git a/sinks/sink-common/src/sink.rs b/sinks/sink-common/src/sink.rs index 5267cd75..0eec6e1d 100644 --- a/sinks/sink-common/src/sink.rs +++ b/sinks/sink-common/src/sink.rs @@ -41,6 +41,15 @@ pub trait Sink { batch: &Value, ) -> Result; + async fn handle_replace( + &mut self, + ctx: &Context, + batch: &Value, + ) -> Result { + self.handle_invalidate(&ctx.cursor).await?; + self.handle_data(ctx, batch).await + } + async fn handle_invalidate(&mut self, cursor: &Option) -> Result<(), Self::Error>; async fn cleanup(&mut self) -> Result<(), Self::Error> { From 54bef23d113945174e51e4658468fe1e9239fabe Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Thu, 28 Mar 2024 16:36:11 +0100 Subject: [PATCH 2/3] sink-mongo: replace pending data inside transaction --- examples/mongo/starknet_to_mongo.js | 5 +- sinks/sink-mongo/src/configuration.rs | 21 +++ sinks/sink-mongo/src/sink.rs | 144 +++++++++++++----- .../sink-mongo/tests/test_multi_collection.rs | 8 +- sinks/sink-mongo/tests/test_sink.rs | 16 +- 5 files changed, 140 insertions(+), 54 deletions(-) diff --git a/examples/mongo/starknet_to_mongo.js b/examples/mongo/starknet_to_mongo.js index 61343a03..0d2cb837 100644 --- a/examples/mongo/starknet_to_mongo.js +++ b/examples/mongo/starknet_to_mongo.js @@ -4,9 +4,10 @@ import { decodeTransfersInBlock, filter } from "../common/starknet.js"; // Configure indexer for streaming Starknet Goerli data starting at the specified block. export const config = { - streamUrl: "https://goerli.starknet.a5a.ch", - startingBlock: 800_000, + streamUrl: "https://sepolia.starknet.a5a.ch", + startingBlock: 53_000, network: "starknet", + finality: "DATA_STATUS_PENDING", filter, sinkType: "mongo", sinkOptions: { diff --git a/sinks/sink-mongo/src/configuration.rs b/sinks/sink-mongo/src/configuration.rs index 1149b812..19d6d662 100644 --- a/sinks/sink-mongo/src/configuration.rs +++ b/sinks/sink-mongo/src/configuration.rs @@ -19,14 +19,32 @@ pub struct SinkMongoOptions { conflicts_with = "collection_names" )] pub collection_name: Option, + /// The collections where to store the data. + /// + /// If this option is set, the `collection_name` option will be ignored. + /// Use this option when writing to multiple collections from the same indexer. #[arg(long, env = "MONGO_COLLECTION_NAMES", value_delimiter = ',')] pub collection_names: Option>, /// Enable storing records as entities. pub entity_mode: Option, + /// Additional conditions to use when invalidating data. + /// + /// Use this option to run multiple indexers on the same collection. #[clap(skip)] pub invalidate: Option, + /// The number of seconds to wait before flushing the batch. + /// + /// If this option is not set, the sink will flush the batch immediately. #[arg(long, env = "MONGO_BATCH_SECONDS")] pub batch_seconds: Option, + /// Use a transaction to replace pending data. + /// + /// This option avoids data "flashing" when the previous pending data is replaced. + /// Turning this flag on requires a MongoDB replica set. If you are using MongoDB + /// Atlas, this is turned on by default. If you're using a standalone MongoDB it + /// won't work. + #[arg(long, env = "MONGO_REPLACE_DATA_INSIDE_TRANSACTION")] + pub replace_data_inside_transaction: Option, } impl SinkOptions for SinkMongoOptions { @@ -39,6 +57,9 @@ impl SinkOptions for SinkMongoOptions { entity_mode: self.entity_mode.or(other.entity_mode), invalidate: self.invalidate.or(other.invalidate), batch_seconds: self.batch_seconds.or(other.batch_seconds), + replace_data_inside_transaction: self + .replace_data_inside_transaction + .or(other.replace_data_inside_transaction), } } } diff --git a/sinks/sink-mongo/src/sink.rs b/sinks/sink-mongo/src/sink.rs index b7dede88..35a52a1d 100644 --- a/sinks/sink-mongo/src/sink.rs +++ b/sinks/sink-mongo/src/sink.rs @@ -6,7 +6,7 @@ use apibara_sink_common::{Context, CursorAction, DisplayCursor, Sink, ValueExt}; use apibara_sink_common::{SinkError, SinkErrorResultExt}; use async_trait::async_trait; use error_stack::{Result, ResultExt}; -use futures_util::TryStreamExt; +use futures_util::{FutureExt, TryStreamExt}; use mongodb::bson::{doc, to_document, Bson, Document}; use mongodb::ClientSession; use std::collections::HashMap; @@ -15,7 +15,7 @@ use mongodb::options::{UpdateModifications, UpdateOptions}; use mongodb::{options::ClientOptions, Client, Collection}; use serde_json::Value; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use crate::configuration::SinkMongoOptions; @@ -46,6 +46,7 @@ pub struct MongoSink { client: Client, mode: Mode, pub batcher: Batcher, + replace_data_inside_transaction: bool, } enum Mode { @@ -107,6 +108,9 @@ impl Sink for MongoSink { mode, invalidate: options.invalidate, batcher: Batcher::by_seconds(options.batch_seconds.unwrap_or_default()), + replace_data_inside_transaction: options + .replace_data_inside_transaction + .unwrap_or(false), }) } @@ -115,6 +119,91 @@ impl Sink for MongoSink { ctx: &Context, batch: &Value, ) -> Result { + let mut session = self + .client + .start_session(None) + .await + .runtime_error("failed to create mongo session")?; + + self.handle_data_with_session(&mut session, ctx, batch) + .await + } + + async fn handle_replace( + &mut self, + ctx: &Context, + batch: &Value, + ) -> Result { + let mut session = self + .client + .start_session(None) + .await + .runtime_error("failed to create mongo session")?; + + if self.replace_data_inside_transaction { + session + .with_transaction( + self, + |session, sink| { + async move { + sink.handle_invalidate_with_session(session, &ctx.cursor) + .await + .map_err(|err| { + error!(err = ?err, "failed to invalidate data inside replace transaction"); + mongodb::error::Error::custom(err) + })?; + + sink.handle_data_with_session(session, ctx, batch) + .await + .map_err(|err| { + error!(err = ?err, "failed to insert data inside replace transaction"); + mongodb::error::Error::custom(err) + }) + } + .boxed() + }, + None, + ) + .await + .runtime_error("failed to replace data inside transaction") + } else { + let mut session = self + .client + .start_session(None) + .await + .runtime_error("failed to create mongo session")?; + self.handle_invalidate_with_session(&mut session, &ctx.cursor) + .await?; + self.handle_data_with_session(&mut session, ctx, batch) + .await + } + } + + async fn handle_invalidate(&mut self, cursor: &Option) -> Result<(), Self::Error> { + let mut session = self + .client + .start_session(None) + .await + .runtime_error("failed to create mongo session")?; + + self.handle_invalidate_with_session(&mut session, cursor) + .await + } +} + +impl MongoSink { + pub fn collection(&self, collection_name: &str) -> Result<&Collection, SinkError> { + self.collections + .get(collection_name) + .runtime_error(&format!("collection '{collection_name}' not found")) + } + + async fn handle_data_with_session( + &mut self, + session: &mut ClientSession, + ctx: &Context, + batch: &Value, + ) -> Result { info!(ctx = %ctx, "handling data"); let batch = batch .as_array_of_objects() @@ -122,14 +211,15 @@ impl Sink for MongoSink { .to_vec(); if ctx.finality != DataFinality::DataStatusFinalized { - self.insert_data(&ctx.end_cursor, &batch).await?; + self.insert_data(session, &ctx.end_cursor, &batch).await?; + return Ok(CursorAction::Persist); } match self.batcher.handle_data(ctx, &batch).await { Ok((action, None)) => Ok(action), Ok((action, Some((end_cursor, batch)))) => { - self.insert_data(&end_cursor, &batch).await?; + self.insert_data(session, &end_cursor, &batch).await?; self.batcher.buffer.clear(); Ok(action) } @@ -137,21 +227,23 @@ impl Sink for MongoSink { } } - async fn handle_invalidate(&mut self, cursor: &Option) -> Result<(), Self::Error> { + async fn handle_invalidate_with_session( + &mut self, + session: &mut ClientSession, + cursor: &Option, + ) -> Result<(), SinkError> { debug!(cursor = %DisplayCursor(cursor), "handling invalidate"); if self.batcher.is_batching() && !self.batcher.is_flushed() { - self.insert_data(&self.batcher.buffer.end_cursor, &self.batcher.buffer.data) - .await?; + self.insert_data( + session, + &self.batcher.buffer.end_cursor, + &self.batcher.buffer.data, + ) + .await?; self.batcher.buffer.clear(); } - let mut session = self - .client - .start_session(None) - .await - .runtime_error("failed to create mongo session")?; - let (mut delete_query, mut unclamp_query) = if let Some(cursor) = cursor { // convert to u32 because that's the maximum bson can handle let block_number = u32::try_from(cursor.order_key).unwrap(); @@ -173,7 +265,7 @@ impl Sink for MongoSink { for collection in self.collections.values() { collection - .delete_many_with_session(delete_query.clone(), None, &mut session) + .delete_many_with_session(delete_query.clone(), None, session) .await .runtime_error("failed to invalidate data (delete)")?; collection @@ -181,7 +273,7 @@ impl Sink for MongoSink { unclamp_query.clone(), unset_cursor_to.clone(), None, - &mut session, + session, ) .await .runtime_error("failed to invalidate data (update)")?; @@ -189,17 +281,10 @@ impl Sink for MongoSink { Ok(()) } -} - -impl MongoSink { - pub fn collection(&self, collection_name: &str) -> Result<&Collection, SinkError> { - self.collections - .get(collection_name) - .runtime_error(&format!("collection '{collection_name}' not found")) - } pub async fn insert_data( &self, + session: &mut ClientSession, end_cursor: &Cursor, values: &[Value], ) -> Result<(), SinkError> { @@ -261,19 +346,10 @@ impl MongoSink { docs_map.insert(collection_name, docs); } - let mut session = self - .client - .start_session(None) - .await - .runtime_error("failed to create mongo session")?; - match &self.mode { - Mode::Standard => { - self.insert_logs_data(end_cursor, docs_map, &mut session) - .await - } + Mode::Standard => self.insert_logs_data(end_cursor, docs_map, session).await, Mode::Entity => { - self.insert_entities_data(end_cursor, docs_map, &mut session) + self.insert_entities_data(end_cursor, docs_map, session) .await } } diff --git a/sinks/sink-mongo/tests/test_multi_collection.rs b/sinks/sink-mongo/tests/test_multi_collection.rs index bd50464c..785d36a2 100644 --- a/sinks/sink-mongo/tests/test_multi_collection.rs +++ b/sinks/sink-mongo/tests/test_multi_collection.rs @@ -376,11 +376,9 @@ async fn test_handle_data_in_entity_mode() -> Result<(), SinkError> { let options = SinkMongoOptions { connection_string: Some(format!("mongodb://localhost:{}", port)), database: Some("test".into()), - collection_name: None, collection_names: Some(collection_names.clone()), entity_mode: Some(true), - invalidate: None, - batch_seconds: None, + ..Default::default() }; let mut sink = MongoSink::from_options(options).await?; @@ -531,11 +529,9 @@ async fn test_handle_invalidate_in_entity_mode() -> Result<(), SinkError> { let options = SinkMongoOptions { connection_string: Some(format!("mongodb://localhost:{}", port)), database: Some("test".into()), - collection_name: None, collection_names: Some(collection_names.clone()), entity_mode: Some(true), - invalidate: None, - batch_seconds: None, + ..Default::default() }; let mut sink = MongoSink::from_options(options).await?; diff --git a/sinks/sink-mongo/tests/test_sink.rs b/sinks/sink-mongo/tests/test_sink.rs index a20b76fc..6ae44e8f 100644 --- a/sinks/sink-mongo/tests/test_sink.rs +++ b/sinks/sink-mongo/tests/test_sink.rs @@ -298,10 +298,8 @@ async fn test_handle_data_in_entity_mode() -> Result<(), SinkError> { connection_string: Some(format!("mongodb://localhost:{}", port)), database: Some("test".into()), collection_name: Some("test".into()), - collection_names: None, entity_mode: Some(true), - invalidate: None, - batch_seconds: None, + ..Default::default() }; let mut sink = MongoSink::from_options(options).await?; @@ -446,10 +444,8 @@ async fn test_handle_invalidate_in_entity_mode() -> Result<(), SinkError> { connection_string: Some(format!("mongodb://localhost:{}", port)), database: Some("test".into()), collection_name: Some("test".into()), - collection_names: None, entity_mode: Some(true), - invalidate: None, - batch_seconds: None, + ..Default::default() }; let mut sink = MongoSink::from_options(options).await?; @@ -729,10 +725,8 @@ async fn test_handle_empty_data() -> Result<(), SinkError> { connection_string: Some(format!("mongodb://localhost:{}", port)), database: Some("test".into()), collection_name: Some("test".into()), - collection_names: None, entity_mode: Some(false), - invalidate: None, - batch_seconds: None, + ..Default::default() }; let mut sink = MongoSink::from_options(options).await?; @@ -762,10 +756,8 @@ async fn test_handle_empty_data_in_entity_mode() -> Result<(), SinkError> { connection_string: Some(format!("mongodb://localhost:{}", port)), database: Some("test".into()), collection_name: Some("test".into()), - collection_names: None, entity_mode: Some(true), - invalidate: None, - batch_seconds: None, + ..Default::default() }; let mut sink = MongoSink::from_options(options).await?; From 3b3edfb7951065ebe80061b2400b1b780b4890a0 Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Thu, 28 Mar 2024 16:39:48 +0100 Subject: [PATCH 3/3] release: sink-mongo v0.7.0 --- Cargo.lock | 2 +- sinks/sink-mongo/CHANGELOG.md | 11 +++++++++++ sinks/sink-mongo/Cargo.toml | 2 +- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8b3e718b..87905ccc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -449,7 +449,7 @@ dependencies = [ [[package]] name = "apibara-sink-mongo" -version = "0.6.2" +version = "0.7.0" dependencies = [ "apibara-core", "apibara-observability", diff --git a/sinks/sink-mongo/CHANGELOG.md b/sinks/sink-mongo/CHANGELOG.md index 5ae67a29..2bcc4481 100644 --- a/sinks/sink-mongo/CHANGELOG.md +++ b/sinks/sink-mongo/CHANGELOG.md @@ -6,6 +6,17 @@ The format is based on [Common Changelog](https://common-changelog.org/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.7.0] - 2024-03-28 + +_Add flag to replace pending data inside transaction._ + +### Added + +- Add `--replace-data-inside-transaction` (env: + `MONGO_REPLACE_DATA_INSIDE_TRANSACTION`) flag to replace pending data in one + transaction. Notice that MongoDB transactions require a MongoDB deployment with + replication turned on. + ## [0.6.2] - 2024-03-21 _Fix issue when transform does not return any data._ diff --git a/sinks/sink-mongo/Cargo.toml b/sinks/sink-mongo/Cargo.toml index 9dcec256..baf39eb7 100644 --- a/sinks/sink-mongo/Cargo.toml +++ b/sinks/sink-mongo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apibara-sink-mongo" -version = "0.6.2" +version = "0.7.0" edition.workspace = true authors.workspace = true repository.workspace = true