Skip to content

Commit

Permalink
add cancellation safety
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder committed Nov 4, 2024
1 parent 497bdf0 commit 3fc9f28
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
34 changes: 17 additions & 17 deletions cli/polka-storage-provider/server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ fn process(
}) => {
tracker.spawn(async move {
tokio::select! {
// AddPiece is cancellation safe, as it can be retried and the state will be fine.
res = add_piece(state, piece_path, piece_cid, deal, published_deal_id) => {
match res {
Ok(_) => tracing::info!("Add Piece for piece {:?}, deal id {}, finished successfully.", piece_cid, published_deal_id),
Err(err) => tracing::error!(%err),
Err(err) => tracing::error!(%err, "Add Piece for piece {:?}, deal id {}, failed!", piece_cid, published_deal_id),
}
},
() = token.cancelled() => {
Expand All @@ -130,16 +131,10 @@ fn process(
sector_number: sector_id,
}) => {
tracker.spawn(async move {
tokio::select! {
res = precommit(state, sector_id) => {
match res {
Ok(_) => tracing::info!("Precommit for sector {} finished successfully.", sector_id),
Err(err) => tracing::error!(%err),
}
},
() = token.cancelled() => {
tracing::warn!("PreCommit has been cancelled.");
}
// Precommit is not cancellation safe.
match precommit(state, sector_id).await {
Ok(_) => tracing::info!("Precommit for sector {} finished successfully.", sector_id),
Err(err) => tracing::error!(%err),
}
});
}
Expand All @@ -157,6 +152,10 @@ async fn find_sector_for_piece(state: &Arc<PipelineState>) -> Result<Sector, Pip
Ok(sector)
}

/// Finds a sector to which a piece will fit and adds it to the sector.
/// This function is *cancellation safe* as if future is dropped,
/// it can be dropped only when waiting for `spawn_blocking`.
/// When dropped when waiting, the sector state won't be preserved and adding piece can be retried.
#[tracing::instrument(skip_all, fields(piece_cid, deal_id))]
async fn add_piece(
state: Arc<PipelineState>,
Expand Down Expand Up @@ -201,6 +200,9 @@ async fn add_piece(
}

#[tracing::instrument(skip_all, fields(sector_number))]
/// Creates a replica and calls pre-commit on-chain.
/// This is *NOT CANCELLATION SAFE*.
/// I.e. when interrupted when waiting for extrinsic call to return, the state on-chain will be inconsistent with state in Storage Provider.
async fn precommit(
state: Arc<PipelineState>,
sector_number: SectorNumber,
Expand All @@ -212,10 +214,8 @@ async fn precommit(
tracing::error!("Tried to precommit non-existing sector");
return Err(PipelineError::NotExistentSector);
};

sector.state = SectorState::Sealing;
// Pad sector so CommD can be properly calculated.
sector.piece_infos = sealer.pad_sector(&sector.piece_infos, sector.occupied_sector_space)?;
state.db.save_sector(&sector)?;

tracing::debug!("Padded sector, commencing pre-commit.");
// TODO(@th7nder,31/10/2024): what happens if some of the process fails? SP will be slashed, and there is no error reporting? what about retries?
Expand All @@ -240,9 +240,6 @@ async fn precommit(
let sealing_output = sealing_handle.await??;
tracing::info!("Created sector's replica: {:?}", sealing_output);

sector.state = SectorState::Sealed;
state.db.save_sector(&sector)?;

let current_block = state.xt_client.height(false).await?;
tracing::debug!("Precommiting at block: {}", current_block);

Expand Down Expand Up @@ -276,6 +273,9 @@ async fn precommit(
)
.await?;

sector.state = SectorState::Sealed;
state.db.save_sector(&sector)?;

let precommited_sectors = result
.events
.find::<storagext::runtime::storage_provider::events::SectorsPreCommitted>()
Expand Down
1 change: 0 additions & 1 deletion cli/polka-storage-provider/server/src/pipeline/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ impl Sector {
pub enum SectorState {
Unsealed,
Sealed,
Sealing,
Precommitted,
Proven,
}

0 comments on commit 3fc9f28

Please sign in to comment.