Skip to content

Commit

Permalink
chore(topology): Allow internal topologies to be controlled
Browse files Browse the repository at this point in the history
The internal topologies are currently not wrapped by a topology controller. This
prevents them from being controlled externally in the case of an early shutdown.
  • Loading branch information
bruceg committed Apr 11, 2024
1 parent 665ab39 commit 56061ac
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,29 @@ impl Application {
require_healthy: root_opts.require_healthy,
#[cfg(feature = "enterprise")]
enterprise_reporter: config.enterprise,
extra_context: config.extra_context,
extra_context: config.extra_context.clone(),
});

let internal_topologies: Vec<_> = config
.internal_topologies
.into_iter()
.map(|topology| {
SharedTopologyController::new(TopologyController {
#[cfg(feature = "api")]
api_server: None,
topology,
config_paths: Vec::new(),
require_healthy: root_opts.require_healthy,
#[cfg(feature = "enterprise")]
enterprise_reporter: None,
extra_context: config.extra_context.clone(),
})
})
.collect();

Ok(StartedApplication {
config_paths: config.config_paths,
internal_topologies: config.internal_topologies,
internal_topologies,
graceful_crash_receiver: config.graceful_crash_receiver,
signals,
topology_controller,
Expand All @@ -273,7 +290,7 @@ impl Application {

pub struct StartedApplication {
pub config_paths: Vec<ConfigPath>,
pub internal_topologies: Vec<RunningTopology>,
pub internal_topologies: Vec<SharedTopologyController>,
pub graceful_crash_receiver: ShutdownErrorReceiver,
pub signals: SignalPair,
pub topology_controller: SharedTopologyController,
Expand Down Expand Up @@ -391,7 +408,7 @@ pub struct FinishedApplication {
pub signal: SignalTo,
pub signal_rx: SignalRx,
pub topology_controller: SharedTopologyController,
pub internal_topologies: Vec<RunningTopology>,
pub internal_topologies: Vec<SharedTopologyController>,
}

impl FinishedApplication {
Expand All @@ -403,11 +420,11 @@ impl FinishedApplication {
internal_topologies,
} = self;

// At this point, we'll have the only reference to the shared topology controller and can
// safely remove it from the wrapper to shut down the topology.
// At this point, we should have the only reference to the shared topology controllers and
// can safely remove it from the wrapper to shut down the topology.
let topology_controller = topology_controller
.try_into_inner()
.expect("fail to unwrap topology controller")
.expect("Topology controller is still shared, cannot stop")
.into_inner();

let status = match signal {
Expand All @@ -417,7 +434,11 @@ impl FinishedApplication {
};

for topology in internal_topologies {
topology.stop().await;
let Ok(topology) = topology.try_into_inner() else {
warn!("Internal topology controller is still shared, cannot stop.");
continue;
};
topology.into_inner().stop().await;
}

status
Expand Down

0 comments on commit 56061ac

Please sign in to comment.