Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add forcing shutdowning to: session, unbound session #1973

Merged
merged 3 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub type ValueSearchHolder = BaseSearcher<ValueSearchState>;
impl ValueSearchHolder {
pub fn setup(&mut self, terms: Vec<String>) -> Result<(), SearchError> {
let mut matchers = vec![];
for (_pos, filter) in terms.iter().enumerate() {
for filter in terms.iter() {
matchers.push(Regex::from_str(&as_regex(filter)).map_err(|err| {
SearchError::Regex(format!("Failed to create regex for {filter}: {err}"))
})?);
Expand Down
2 changes: 2 additions & 0 deletions application/apps/indexer/session/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ impl LifecycleTransition {
pub enum ComputationError {
#[error("Destination path should be defined to stream from MassageProducer")]
DestinationPath,
#[error("Fail to create session")]
SessionCreatingFail,
#[error("Native communication error ({0})")]
Communication(String),
#[error("Operation not supported ({0})")]
Expand Down
29 changes: 19 additions & 10 deletions application/apps/indexer/session/src/handlers/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@ pub struct SleepResult {
pub sleep_well: bool,
}

pub async fn handle(operation_api: &OperationAPI, ms: u64) -> OperationResult<SleepResult> {
let canceler = operation_api.cancellation_token();
select! {
_ = async move {
time::sleep(time::Duration::from_millis(ms)).await;
} => {
Ok(Some( SleepResult { sleep_well: true }))
},
_ = canceler.cancelled() => {
Ok(Some( SleepResult { sleep_well: false }))
pub async fn handle(
operation_api: &OperationAPI,
ms: u64,
ignore_cancellation: bool,
) -> OperationResult<SleepResult> {
if ignore_cancellation {
time::sleep(time::Duration::from_millis(ms)).await;
Ok(Some(SleepResult { sleep_well: true }))
} else {
let canceler = operation_api.cancellation_token();
select! {
_ = async move {
time::sleep(time::Duration::from_millis(ms)).await;
} => {
Ok(Some( SleepResult { sleep_well: true }))
},
_ = canceler.cancelled() => {
Ok(Some( SleepResult { sleep_well: false }))
}
}
}
}
16 changes: 11 additions & 5 deletions application/apps/indexer/session/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub enum OperationKind {
Cancel {
target: Uuid,
},
Sleep(u64),
Sleep(u64, bool),
End,
}

Expand All @@ -136,7 +136,7 @@ impl std::fmt::Display for OperationKind {
OperationKind::Map { .. } => "Mapping",
OperationKind::Values { .. } => "Values",
OperationKind::Merge { .. } => "Merging",
OperationKind::Sleep(_) => "Sleeping",
OperationKind::Sleep(_, _) => "Sleeping",
OperationKind::Cancel { .. } => "Canceling",
OperationKind::GetNearestPosition(_) => "Getting nearest position",
OperationKind::End => "End",
Expand Down Expand Up @@ -380,9 +380,12 @@ impl OperationAPI {
} => {
unimplemented!("merging not yet supported");
}
OperationKind::Sleep(ms) => {
api.finish(handlers::sleep::handle(&api, ms).await, operation_str)
.await;
OperationKind::Sleep(ms, ignore_cancellation) => {
api.finish(
handlers::sleep::handle(&api, ms, ignore_cancellation).await,
operation_str,
)
.await;
}
OperationKind::Cancel { target } => match tracker.cancel_operation(target).await {
Ok(canceled) => {
Expand Down Expand Up @@ -486,5 +489,8 @@ pub async fn run(
if let Err(err) = tracker_api.shutdown() {
error!("Failed to shutdown tracker: {:?}", err);
}
if let Err(err) = state_api.shutdown() {
error!("Fail to shutdown state; error: {:?}", err);
}
debug!("operations task finished");
}
130 changes: 93 additions & 37 deletions application/apps/indexer/session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::{
tracker,
tracker::OperationTrackerAPI,
};
use futures::Future;
use indexer_base::progress::Severity;
use log::{debug, error};
use log::{debug, error, warn};
use processor::{grabber::LineRange, search::filter::SearchFilter};
use serde::Serialize;
use sources::{factory::ObserveOptions, sde};
Expand All @@ -19,17 +20,21 @@ use tokio::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
},
task,
task::{self, JoinHandle},
time,
};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

pub type OperationsChannel = (UnboundedSender<Operation>, UnboundedReceiver<Operation>);

pub const SHUTDOWN_TIMEOUT_IN_MS: u64 = 2000;

pub struct Session {
uuid: Uuid,
tx_operations: UnboundedSender<Operation>,
destroyed: CancellationToken,
destroying: CancellationToken,
pub state: SessionStateAPI,
pub tracker: OperationTrackerAPI,
}
Expand Down Expand Up @@ -57,51 +62,89 @@ impl Session {
uuid,
tx_operations: tx_operations.clone(),
destroyed: CancellationToken::new(),
destroying: CancellationToken::new(),
state: state_api.clone(),
tracker: tracker_api.clone(),
};
let destroyed = session.destroyed.clone();
task::spawn(async move {
let destroying = session.destroying.clone();
let (tx, rx) = oneshot::channel();
let handle = task::spawn(async move {
let self_handle: JoinHandle<()> = match rx.await {
Ok(handle) => handle,
Err(_) => {
error!("Fail to get handle of session task");
return;
}
};
debug!("Session is started");
let tx_callback_events_state = tx_callback_events.clone();
join!(
async {
operations::run(
rx_operations,
state_api.clone(),
tracker_api.clone(),
tx_callback_events.clone(),
destroying.cancelled().await;
if time::timeout(
time::Duration::from_millis(SHUTDOWN_TIMEOUT_IN_MS),
destroyed.cancelled(),
)
.await;
if let Err(err) = state_api.shutdown() {
error!("Fail to shutdown state; error: {:?}", err);
}
},
async {
if let Err(err) = state::run(rx_state_api, tx_callback_events_state).await {
error!("State loop exits with error:: {:?}", err);
if let Err(err) =
Session::send_stop_signal(Uuid::new_v4(), &tx_operations, None).await
{
error!("Fail to send stop signal (on state fail):: {:?}", err);
}
}
.await
.is_err()
{
warn!(
"Session isn't shutdown in {}s; forcing termination.",
SHUTDOWN_TIMEOUT_IN_MS / 1000
);
self_handle.abort();
destroyed.cancel();
};
},
async {
if let Err(err) = tracker::run(state_api.clone(), rx_tracker_api).await {
error!("Tracker loop exits with error:: {:?}", err);
if let Err(err) =
Session::send_stop_signal(Uuid::new_v4(), &tx_operations, None).await
{
error!("Fail to send stop signal (on tracker fail):: {:?}", err);
}
}
},
join!(
operations::run(
rx_operations,
state_api.clone(),
tracker_api.clone(),
tx_callback_events.clone(),
),
Self::run(
&tx_operations,
&destroying,
"state",
state::run(rx_state_api, tx_callback_events_state)
),
Self::run(
&tx_operations,
&destroying,
"tracker",
tracker::run(state_api.clone(), rx_tracker_api)
),
);
destroyed.cancel();
debug!("Session is finished");
}
);
destroyed.cancel();
debug!("Session is finished");
debug!("Session task is finished");
});
Ok((session, rx_callback_events))
if tx.send(handle).is_err() {
Err(ComputationError::SessionCreatingFail)
} else {
Ok((session, rx_callback_events))
}
}

async fn run(
tx_operations: &UnboundedSender<Operation>,
destroying: &CancellationToken,
name: &str,
f: impl Future<Output = Result<(), crate::events::NativeError>> + Send + 'static,
) {
if let Err(err) = f.await {
error!("State loop exits with error:: {:?}", err);
if let Err(err) =
Session::send_stop_signal(Uuid::new_v4(), tx_operations, None, destroying).await
{
error!("Fail to send stop signal (on {} fail):: {:?}", name, err);
}
}
}

pub fn get_uuid(&self) -> Uuid {
Expand Down Expand Up @@ -254,7 +297,9 @@ impl Session {
operation_id: Uuid,
tx_operations: &UnboundedSender<Operation>,
destroyed: Option<&CancellationToken>,
destroying: &CancellationToken,
) -> Result<(), ComputationError> {
destroying.cancel();
tx_operations
.send(Operation::new(operation_id, operations::OperationKind::End))
.map_err(|e| ComputationError::Communication(e.to_string()))?;
Expand All @@ -265,7 +310,13 @@ impl Session {
}

pub async fn stop(&self, operation_id: Uuid) -> Result<(), ComputationError> {
Session::send_stop_signal(operation_id, &self.tx_operations, Some(&self.destroyed)).await
Session::send_stop_signal(
operation_id,
&self.tx_operations,
Some(&self.destroyed),
&self.destroying,
)
.await
}

pub async fn get_stream_len(&self) -> Result<usize, ComputationError> {
Expand Down Expand Up @@ -440,11 +491,16 @@ impl Session {
}

/// Used for debug goals
pub fn sleep(&self, operation_id: Uuid, ms: u64) -> Result<(), ComputationError> {
pub fn sleep(
&self,
operation_id: Uuid,
ms: u64,
ignore_cancellation: bool,
) -> Result<(), ComputationError> {
self.tx_operations
.send(Operation::new(
operation_id,
operations::OperationKind::Sleep(ms),
operations::OperationKind::Sleep(ms, ignore_cancellation),
))
.map_err(|e| ComputationError::Communication(e.to_string()))
}
Expand Down
7 changes: 5 additions & 2 deletions application/apps/indexer/session/src/state/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
},
tracker::OperationTrackerAPI,
};
use log::error;
use parsers;
use processor::{
grabber::LineRange,
Expand Down Expand Up @@ -235,7 +236,7 @@ impl SessionStateAPI {
api: Api,
rx_response: oneshot::Receiver<T>,
) -> Result<T, NativeError> {
let api_str = format!("{api}");
let api_str = api.to_string();
self.tx_api.send(api).map_err(|e| {
NativeError::channel(&format!("Failed to send to Api::{api_str}; error: {e}"))
})?;
Expand Down Expand Up @@ -554,7 +555,9 @@ impl SessionStateAPI {

pub async fn close_session(&self) -> Result<(), NativeError> {
self.closing_token.cancel();
self.tracker.cancel_all().await?;
if let Err(err) = self.tracker.cancel_all().await {
error!("Fail to correctly stop tracker: {err:?}");
}
let (tx, rx) = oneshot::channel();
self.exec_operation(Api::CloseSession(tx), rx).await
}
Expand Down
25 changes: 20 additions & 5 deletions application/apps/indexer/session/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ use indexer_base::progress::Severity;
use log::{debug, error};
use sources::producer::SdeSender;
use std::collections::{hash_map::Entry, HashMap};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
},
time::{timeout, Duration},
};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

const CANCEL_OPERATION_TIMEOUT: u64 = 3000;

pub enum TrackerCommand {
AddOperation(
(
Expand Down Expand Up @@ -266,8 +271,18 @@ pub async fn run(
if !done_token.is_cancelled() {
operation_cancalation_token.cancel();
debug!("waiting for operation {} would confirm done-state", uuid);
// TODO: add timeout to preven situation with waiting forever. 2-3 sec.
done_token.cancelled().await;
if timeout(
Duration::from_millis(CANCEL_OPERATION_TIMEOUT),
done_token.cancelled(),
)
.await
.is_err()
{
error!(
"timeout {}s to stop opearation {uuid}",
CANCEL_OPERATION_TIMEOUT / 1000
);
}
progress.stopped(uuid);
}
}
Expand Down
Loading
Loading