Skip to content

Commit

Permalink
Add blue_onyx_service
Browse files Browse the repository at this point in the history
  • Loading branch information
xnorpx committed Dec 28, 2024
1 parent 4f60fe5 commit 13bec46
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 129 deletions.
20 changes: 19 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ windows = { version = "0", default-features = false, features = [
"Win32_Graphics_Direct3D12",
"Win32_System_Threading",
] }
windows-service = "0"

[build-dependencies]
ureq = "2"
Expand Down
130 changes: 6 additions & 124 deletions src/bin/blue_onyx.rs
Original file line number Diff line number Diff line change
@@ -1,141 +1,26 @@
use blue_onyx::{
detector::DetectorConfig,
init_logging,
server::{run_server, Metrics},
system_info::{self, system_info},
worker::DetectorWorker,
LogLevel,
};
use blue_onyx::{blue_onyx_service, cli::Cli, init_logging, system_info::system_info};
use clap::Parser;
use std::{path::PathBuf, sync::mpsc::channel};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use windows::Win32::System::Threading::{
GetCurrentProcessorNumber, GetCurrentThread, SetThreadAffinityMask, SetThreadPriority,
THREAD_PRIORITY_TIME_CRITICAL,
};

#[derive(Parser)]
#[command(author = "Marcus Asteborg", version=env!("CARGO_PKG_VERSION"), about = "TODO")]
struct Cli {
/// The port on which the server will listen for HTTP requests.
/// Default is 32168. Example usage: --port 1337
//#[arg(long, default_value_t = 32168)]
#[arg(long, default_value_t = 32168)]
pub port: u16,
/// Path to the ONNX rt-detrv2 onnx model file.
/// If not given the default model small model is used.
#[clap(long)]
model: Option<PathBuf>,
/// Path to the object classes yaml file
/// Default: coco_classes.yaml which is the 80 standard COCO classes
#[clap(long)]
object_classes: Option<PathBuf>,
/// Filters the results to include only the specified labels. Provide labels separated by ','.
/// Example: --object_filter "person,cup"
#[arg(long, value_delimiter = ',', num_args = 1..)]
pub object_filter: Vec<String>,
/// Sets the level of logging
#[clap(long, value_enum, default_value_t = LogLevel::Info)]
log_level: LogLevel,
/// If log_path is set, then stdout logging will be disabled and it will log to file
#[clap(long)]
log_path: Option<PathBuf>,
/// Confidence threshold for object detection
#[clap(long, default_value_t = 0.5)]
confidence_threshold: f32,
/// Force using CPU for inference
#[clap(long, default_value_t = false)]
force_cpu: bool,
/// Intra thread parallelism max is cpu cores - 1
#[clap(long, default_value_t = 192)]
intra_threads: usize,
/// Inter thread parallelism max is cpu cores - 1
#[clap(long, default_value_t = 192)]
inter_threads: usize,
/// Optional path to save the processed images
#[clap(long)]
save_image_path: Option<PathBuf>,
/// Save the reference image (only if save_image_path is provided)
#[clap(long, default_value_t = false)]
save_ref_image: bool,
/// GPU
#[clap(long, default_value_t = 0)]
gpu_index: i32,
/// Save inference stats to file
#[clap(long)]
save_stats_path: Option<PathBuf>,
/// Path to download all models to
/// This command will only download the models to the specified path
/// and then exit
#[clap(long)]
download_model_path: Option<PathBuf>,
}
use tracing::info;

fn main() -> anyhow::Result<()> {
let parse = Cli::parse();
let args = parse;
init_logging(args.log_level, None);
init_logging(args.log_level, args.log_path.clone());
system_info()?;

if args.download_model_path.is_some() {
blue_onyx::download_models::download_models(args.download_model_path.unwrap())?;
return Ok(());
}

let detector_config = DetectorConfig {
model: args.model,
object_classes: args.object_classes,
object_filter: args.object_filter,
confidence_threshold: args.confidence_threshold,
force_cpu: args.force_cpu,
save_image_path: args.save_image_path,
save_ref_image: args.save_ref_image,
gpu_index: args.gpu_index,
intra_threads: args.intra_threads,
inter_threads: args.inter_threads,
};

let (sender, receive) = channel();
// Run a separate thread for the detector worker
let mut detector_worker = DetectorWorker::new(detector_config, receive)?;

let detector = detector_worker.get_detector();
let model_name = detector.get_model_name();
let using_gpu = detector.is_using_gpu();
let execution_providers_name = detector.get_endpoint_provider_name();

let device_name = if using_gpu {
system_info::gpu_model(args.gpu_index as usize)
} else {
system_info::cpu_model()
};
let metrics = Metrics::new(model_name.clone(), device_name, execution_providers_name);

let thread_handle = std::thread::spawn(move || {
#[cfg(target_os = "windows")]
unsafe {
let thread_handle = GetCurrentThread();
if let Err(err) = SetThreadPriority(thread_handle, THREAD_PRIORITY_TIME_CRITICAL) {
error!(?err, "Failed to set thread priority to time critical");
}
let processor_number = GetCurrentProcessorNumber();
let core_mask = 1usize << processor_number;
let previous_mask = SetThreadAffinityMask(thread_handle, core_mask);
if previous_mask == 0 {
error!("Failed to set thread affinity.");
}
}
detector_worker.run();
});
let (blue_onyx_service, cancellation_token, thread_handle) = blue_onyx_service(args)?;

// Run the tokio runtime on the main thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

rt.block_on(async {
let cancellation_token = CancellationToken::new();
let ctrl_c_token = cancellation_token.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c()
Expand All @@ -144,13 +29,10 @@ fn main() -> anyhow::Result<()> {
info!("Ctrl+C received, shutting down server");
ctrl_c_token.cancel();
});

run_server(args.port, cancellation_token, sender, metrics)
blue_onyx_service
.await
.expect("Failed to run server");
.expect("Failed to run blue onyx service");
});

thread_handle.join().expect("Failed to join worker thread");

Ok(())
}
145 changes: 145 additions & 0 deletions src/bin/blue_onyx_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//! Blue Onyx service.
//!
//! Same CLI arguments as the standalone binary.
//!
//! Avoid having spaces in you binpath, keep the space after the `=`:
//! https://stackoverflow.com/questions/3663331/when-creating-a-service-with-sc-exe-how-to-pass-in-context-parameters
//!
//! Install the service:
//! `sc.exe create blue_onyx_service binPath= "<path>\blue_onyx_service.exe --port 12345" start= auto displayname= "Blue Onyx Service"`
//!
//! Start the service: `net start blue_onyx_service`
//!
//! Stop the service: `net stop blue_onyx_service`
//!
//! Uninstall the service: `sc.exe delete blue_onyx_service`
//!
//! You can have several services pointing to the same binary with different names and arguments.
#[cfg(windows)]
fn main() -> windows_service::Result<()> {
blue_onyx_service::run()
}

#[cfg(not(windows))]
fn main() {
panic!("This program is only intended to run on Windows.");
}

#[cfg(windows)]
mod blue_onyx_service {
use blue_onyx::{blue_onyx_service, cli::Cli, init_logging};
use clap::Parser;
use std::{env, ffi::OsString, future::Future, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

use windows_service::{
define_windows_service,
service::{
ServiceControl, ServiceControlAccept, ServiceExitCode, ServiceState, ServiceStatus,
ServiceType,
},
service_control_handler::{self, ServiceControlHandlerResult},
service_dispatcher, Result,
};

const SERVICE_NAME: &str = "blue_onyx_service";
const SERVICE_TYPE: ServiceType = ServiceType::OWN_PROCESS;

pub fn run() -> Result<()> {
service_dispatcher::start(SERVICE_NAME, ffi_service_main)
}
define_windows_service!(ffi_service_main, my_service_main);

pub fn my_service_main(service_name: Vec<OsString>) {
let arguments: Vec<OsString> = env::args_os().collect();
let args = Cli::try_parse_from(arguments.clone()).unwrap();

let default_log_path = std::path::PathBuf::from(format!(
"{}\\{}",
std::env::var("PROGRAMDATA").unwrap_or_else(|_| "C:\\ProgramData".to_string()),
service_name[0].to_string_lossy()
));

let log_path = args
.log_path
.clone()
.unwrap_or_else(|| default_log_path.clone());

println!("Logs will be written to log path: {}", log_path.display());
let _guard = init_logging(args.log_level, Some("c:\\git\\".into()));
info!("Starting blue onyx service with args: {:#?}", arguments);

let (blue_onyx_service, cancellation_token, thread_handle) = match blue_onyx_service(args) {
Ok(v) => v,
Err(err) => {
error!(?err, "Failed to init blue onyx service");
return;
}
};

if let Err(err) = run_service(blue_onyx_service, cancellation_token) {
error!(?err, "Blue onyx service failed");
}

thread_handle
.join()
.expect("Failed to join detector worker thread");
}

pub fn run_service(
blue_onyx_service: impl Future<Output = anyhow::Result<()>>,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
let event_handler = move |control_event| -> ServiceControlHandlerResult {
match control_event {
ServiceControl::Interrogate => ServiceControlHandlerResult::NoError,
ServiceControl::Stop => {
cancellation_token.cancel();
ServiceControlHandlerResult::NoError
}
ServiceControl::UserEvent(code) => {
if code.to_raw() == 130 {
cancellation_token.cancel();
}
ServiceControlHandlerResult::NoError
}
_ => ServiceControlHandlerResult::NotImplemented,
}
};
let status_handle = service_control_handler::register(SERVICE_NAME, event_handler)?;

status_handle.set_service_status(ServiceStatus {
service_type: SERVICE_TYPE,
current_state: ServiceState::Running,
controls_accepted: ServiceControlAccept::STOP,
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None,
})?;

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

rt.block_on(async {
blue_onyx_service
.await
.expect("Failed to run blue onyx service");
});

status_handle.set_service_status(ServiceStatus {
service_type: SERVICE_TYPE,
current_state: ServiceState::Stopped,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None,
})?;

Ok(())
}
}
Loading

0 comments on commit 13bec46

Please sign in to comment.