Skip to content

Commit

Permalink
release: sink-mongo (#356)
Browse files Browse the repository at this point in the history
### Summary

- sink-mongo: 0.7.0
  • Loading branch information
fracek authored Mar 28, 2024
2 parents e6e3961 + d0ebb89 commit 7990dde
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 68 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions examples/mongo/starknet_to_mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import { decodeTransfersInBlock, filter } from "../common/starknet.js";

// Configure indexer for streaming Starknet Goerli data starting at the specified block.
export const config = {
streamUrl: "https://goerli.starknet.a5a.ch",
startingBlock: 800_000,
streamUrl: "https://sepolia.starknet.a5a.ch",
startingBlock: 53_000,
network: "starknet",
finality: "DATA_STATUS_PENDING",
filter,
sinkType: "mongo",
sinkOptions: {
Expand Down
13 changes: 6 additions & 7 deletions sinks/sink-common/src/connector/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,6 @@ where
state: &mut PersistedState<F>,
ct: CancellationToken,
) -> Result<(CursorAction, StreamAction), SinkError> {
if self.needs_invalidation {
self.handle_invalidate(context.cursor.clone(), state, ct.clone())
.await?;
self.needs_invalidation = false;
}

// fatal error since if the sink is restarted it will receive the same data again.
let json_batch = batch
.into_iter()
Expand All @@ -206,7 +200,12 @@ where
}
}

let mut action = self.sink.handle_data(&context, &data, ct).await?;
let mut action = if self.needs_invalidation {
self.needs_invalidation = false;
self.sink.handle_replace(&context, &data, ct).await?
} else {
self.sink.handle_data(&context, &data, ct).await?
};

// If it's pending, don't store the cursor.
if context.finality.is_pending() {
Expand Down
35 changes: 30 additions & 5 deletions sinks/sink-common/src/connector/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,20 @@ impl<S: Sink + Send + Sync> SinkWithBackoff<S> {
batch: &Value,
ct: CancellationToken,
) -> Result<CursorAction, SinkError> {
// info!("handling data with backoff: {:?}", &self.backoff);
for duration in &self.backoff {
// info!("trying to handle data, duration: {:?}", duration);
match self.inner.handle_data(ctx, batch).await {
Ok(action) => return Ok(action),
Err(err) => {
warn!(err = ?err, "failed to handle data");
if ct.is_cancelled() {
// info!("cancelled while handling data");
return Err(err)
.change_context(SinkError::Fatal)
.attach_printable("failed to handle data (cancelled)");
}
tokio::select! {
_ = tokio::time::sleep(duration) => {
// info!("retrying to handle data after sleeping");
},
_ = ct.cancelled() => {
// info!("cancelled while retrying to handle data");
return Ok(CursorAction::Skip);
}
};
Expand All @@ -56,6 +51,36 @@ impl<S: Sink + Send + Sync> SinkWithBackoff<S> {
Err(SinkError::Fatal).attach_printable("handle data failed after retry")
}

pub async fn handle_replace(
&mut self,
ctx: &Context,
batch: &Value,
ct: CancellationToken,
) -> Result<CursorAction, SinkError> {
for duration in &self.backoff {
match self.inner.handle_replace(ctx, batch).await {
Ok(action) => return Ok(action),
Err(err) => {
warn!(err = ?err, "failed to handle data");
if ct.is_cancelled() {
return Err(err)
.change_context(SinkError::Fatal)
.attach_printable("failed to handle replace data (cancelled)");
}
tokio::select! {
_ = tokio::time::sleep(duration) => {
},
_ = ct.cancelled() => {
return Ok(CursorAction::Skip);
}
};
}
}
}

Err(SinkError::Fatal).attach_printable("handle replace data failed after retry")
}

pub async fn handle_invalidate(
&mut self,
cursor: &Option<Cursor>,
Expand Down
9 changes: 9 additions & 0 deletions sinks/sink-common/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ pub trait Sink {
batch: &Value,
) -> Result<CursorAction, Self::Error>;

async fn handle_replace(
&mut self,
ctx: &Context,
batch: &Value,
) -> Result<CursorAction, Self::Error> {
self.handle_invalidate(&ctx.cursor).await?;
self.handle_data(ctx, batch).await
}

async fn handle_invalidate(&mut self, cursor: &Option<Cursor>) -> Result<(), Self::Error>;

async fn cleanup(&mut self) -> Result<(), Self::Error> {
Expand Down
11 changes: 11 additions & 0 deletions sinks/sink-mongo/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ The format is based on [Common Changelog](https://common-changelog.org/), and
this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.7.0] - 2024-03-28

_Add flag to replace pending data inside transaction._

### Added

- Add `--replace-data-inside-transaction` (env:
`MONGO_REPLACE_DATA_INSIDE_TRANSACTION`) flag to replace pending data in one
transaction. Notice that MongoDB transactions require a MongoDB deployment with
replication turned on.

## [0.6.2] - 2024-03-21

_Fix issue when transform does not return any data._
Expand Down
2 changes: 1 addition & 1 deletion sinks/sink-mongo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apibara-sink-mongo"
version = "0.6.2"
version = "0.7.0"
edition.workspace = true
authors.workspace = true
repository.workspace = true
Expand Down
21 changes: 21 additions & 0 deletions sinks/sink-mongo/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,32 @@ pub struct SinkMongoOptions {
conflicts_with = "collection_names"
)]
pub collection_name: Option<String>,
/// The collections where to store the data.
///
/// If this option is set, the `collection_name` option will be ignored.
/// Use this option when writing to multiple collections from the same indexer.
#[arg(long, env = "MONGO_COLLECTION_NAMES", value_delimiter = ',')]
pub collection_names: Option<Vec<String>>,
/// Enable storing records as entities.
pub entity_mode: Option<bool>,
/// Additional conditions to use when invalidating data.
///
/// Use this option to run multiple indexers on the same collection.
#[clap(skip)]
pub invalidate: Option<Document>,
/// The number of seconds to wait before flushing the batch.
///
/// If this option is not set, the sink will flush the batch immediately.
#[arg(long, env = "MONGO_BATCH_SECONDS")]
pub batch_seconds: Option<u64>,
/// Use a transaction to replace pending data.
///
/// This option avoids data "flashing" when the previous pending data is replaced.
/// Turning this flag on requires a MongoDB replica set. If you are using MongoDB
/// Atlas, this is turned on by default. If you're using a standalone MongoDB it
/// won't work.
#[arg(long, env = "MONGO_REPLACE_DATA_INSIDE_TRANSACTION")]
pub replace_data_inside_transaction: Option<bool>,
}

impl SinkOptions for SinkMongoOptions {
Expand All @@ -39,6 +57,9 @@ impl SinkOptions for SinkMongoOptions {
entity_mode: self.entity_mode.or(other.entity_mode),
invalidate: self.invalidate.or(other.invalidate),
batch_seconds: self.batch_seconds.or(other.batch_seconds),
replace_data_inside_transaction: self
.replace_data_inside_transaction
.or(other.replace_data_inside_transaction),
}
}
}
Loading

0 comments on commit 7990dde

Please sign in to comment.