Skip to content

Commit

Permalink
Move ActionSchedulerListener to ActionStateResult (#1237)
Browse files Browse the repository at this point in the history
These APIs were near identical and with the new scheduler/worker design
we will be eventually removing the ActionScheduler API and moving it to
the ClientStateManager API (eventually renamed).

towards #1213
  • Loading branch information
allada authored Aug 7, 2024
1 parent 33f09cb commit d57ee8d
Show file tree
Hide file tree
Showing 14 changed files with 324 additions and 239 deletions.
1 change: 0 additions & 1 deletion nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ rust_library(
"src/awaited_action_db/awaited_action.rs",
"src/awaited_action_db/mod.rs",
"src/cache_lookup_scheduler.rs",
"src/default_action_listener.rs",
"src/default_scheduler_factory.rs",
"src/grpc_scheduler.rs",
"src/lib.rs",
Expand Down
21 changes: 4 additions & 17 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use futures::Future;
use nativelink_error::Error;
use nativelink_metric::RootMetricsComponent;
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId};
use nativelink_util::action_messages::{ActionInfo, OperationId};
use nativelink_util::operation_state_manager::ActionStateResult;

use crate::platform_property_manager::PlatformPropertyManager;

/// ActionListener interface is responsible for interfacing with clients
/// that are interested in the state of an action.
pub trait ActionListener: Sync + Send + Unpin {
/// Returns the client operation id.
fn client_operation_id(&self) -> &OperationId;

/// Waits for the action state to change.
fn changed(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + '_>>;
}

/// ActionScheduler interface is responsible for interactions between the scheduler
/// and action related operations.
#[async_trait]
Expand All @@ -50,11 +37,11 @@ pub trait ActionScheduler: Sync + Send + Unpin + RootMetricsComponent + 'static
&self,
client_operation_id: OperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error>;
) -> Result<Box<dyn ActionStateResult>, Error>;

/// Find an existing action by its name.
async fn find_by_client_operation_id(
&self,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error>;
) -> Result<Option<Box<dyn ActionStateResult>>, Error>;
}
67 changes: 42 additions & 25 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
// limitations under the License.

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use futures::Future;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_proto::build::bazel::remote::execution::v2::{
Expand All @@ -31,14 +29,15 @@ use nativelink_util::action_messages::{
use nativelink_util::background_spawn;
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
use nativelink_util::operation_state_manager::ActionStateResult;
use nativelink_util::store_trait::Store;
use parking_lot::{Mutex, MutexGuard};
use scopeguard::guard;
use tokio::sync::oneshot;
use tonic::Request;
use tracing::{event, Level};

use crate::action_scheduler::{ActionListener, ActionScheduler};
use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

/// Actions that are having their cache checked or failed cache lookup and are
Expand All @@ -48,7 +47,7 @@ type CheckActions = HashMap<
ActionUniqueKey,
Vec<(
OperationId,
oneshot::Sender<Result<Pin<Box<dyn ActionListener>>, Error>>,
oneshot::Sender<Result<Box<dyn ActionStateResult>, Error>>,
)>,
>;

Expand Down Expand Up @@ -91,14 +90,14 @@ async fn get_action_from_store(
}
}

/// Future for when ActionListeners are known.
type ActionListenerOneshot = oneshot::Receiver<Result<Pin<Box<dyn ActionListener>>, Error>>;
/// Future for when ActionStateResults are known.
type ActionStateResultOneshot = oneshot::Receiver<Result<Box<dyn ActionStateResult>, Error>>;

fn subscribe_to_existing_action(
inflight_cache_checks: &mut MutexGuard<CheckActions>,
unique_qualifier: &ActionUniqueKey,
client_operation_id: &OperationId,
) -> Option<ActionListenerOneshot> {
) -> Option<ActionStateResultOneshot> {
inflight_cache_checks
.get_mut(unique_qualifier)
.map(|oneshots| {
Expand All @@ -107,20 +106,36 @@ fn subscribe_to_existing_action(
rx
})
}
struct CachedActionListener {
client_operation_id: OperationId,

struct CacheLookupActionStateResult {
action_state: Arc<ActionState>,
change_called: bool,
}

impl ActionListener for CachedActionListener {
fn client_operation_id(&self) -> &OperationId {
&self.client_operation_id
#[async_trait]
impl ActionStateResult for CacheLookupActionStateResult {
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
Ok(self.action_state.clone())
}

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
if self.change_called {
return Err(make_err!(
Code::Internal,
"CacheLookupActionStateResult::changed called twice"
));
}
self.change_called = true;
Ok(self.action_state.clone())
}

fn changed(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + '_>> {
Box::pin(async { Ok(self.action_state.clone()) })
async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
// TODO(allada) We should probably remove as_action_info()
// or implement it properly.
return Err(make_err!(
Code::Unimplemented,
"as_action_info not implemented for CacheLookupActionStateResult::as_action_info"
));
}
}

Expand Down Expand Up @@ -149,7 +164,7 @@ impl ActionScheduler for CacheLookupScheduler {
&self,
client_operation_id: OperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
) -> Result<Box<dyn ActionStateResult>, Error> {
let unique_key = match &action_info.unique_qualifier {
ActionUniqueQualifier::Cachable(unique_key) => unique_key.clone(),
ActionUniqueQualifier::Uncachable(_) => {
Expand Down Expand Up @@ -191,7 +206,7 @@ impl ActionScheduler for CacheLookupScheduler {
let action_listener = action_listener_fut.await.map_err(|_| {
make_err!(
Code::Internal,
"ActionListener tx hung up in CacheLookupScheduler::add_action"
"ActionStateResult tx hung up in CacheLookupScheduler::add_action"
)
})?;
return action_listener;
Expand Down Expand Up @@ -239,16 +254,18 @@ impl ActionScheduler for CacheLookupScheduler {
let Some(pending_txs) = maybe_pending_txs else {
return; // Nobody is waiting for this action anymore.
};
let action_state = Arc::new(ActionState {
let mut action_state = ActionState {
operation_id: OperationId::default(),
stage: ActionStage::CompletedFromCache(action_result),
action_digest: action_info.unique_qualifier.digest(),
});
};

for (client_operation_id, pending_tx) in pending_txs {
action_state.operation_id = client_operation_id;
// Ignore errors here, as the other end may have hung up.
let _ = pending_tx.send(Ok(Box::pin(CachedActionListener {
client_operation_id,
action_state: action_state.clone(),
let _ = pending_tx.send(Ok(Box::new(CacheLookupActionStateResult {
action_state: Arc::new(action_state.clone()),
change_called: false,
})));
}
return;
Expand Down Expand Up @@ -297,7 +314,7 @@ impl ActionScheduler for CacheLookupScheduler {
.map_err(|_| {
make_err!(
Code::Internal,
"ActionListener tx hung up in CacheLookupScheduler::add_action"
"ActionStateResult tx hung up in CacheLookupScheduler::add_action"
)
})?
.err_tip(|| "In CacheLookupScheduler::add_action")
Expand All @@ -306,7 +323,7 @@ impl ActionScheduler for CacheLookupScheduler {
async fn find_by_client_operation_id(
&self,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error> {
) -> Result<Option<Box<dyn ActionStateResult>>, Error> {
self.action_scheduler
.find_by_client_operation_id(client_operation_id)
.await
Expand Down
77 changes: 0 additions & 77 deletions nativelink-scheduler/src/default_action_listener.rs

This file was deleted.

Loading

0 comments on commit d57ee8d

Please sign in to comment.