Skip to content

Commit

Permalink
optimized shutdown of threads
Browse files Browse the repository at this point in the history
  • Loading branch information
krystianity committed Oct 4, 2024
1 parent 8b48022 commit fec5bf2
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 79 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* **BREAKING** http server is not started unless --p is provided
* **BREAKING** migrated to new API /api/v1/events -> /api/events
* if one of the threads exit, the whole program will exit
* moved to [email protected], will migrated incident_key -> alert_key in code and db
* moved to [email protected], will migrate incident_key -> alert_key in code and db
* added event mapping keys to map mqtt payloads to event api
* added event filter keys to filter mqtt payloads

Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ chrono = "0.4"
clap = "2"
ilert = "4.0.1"
uuid = { version = "1.10", features = ["v4"] }
ctrlc = { version = "3.4" }
rusqlite = { version = "0.32", features = ["bundled"] } # SQLite 3.46.0
rumqttc = "0.24"
rdkafka = { version = "0.36", features = ["cmake-build"] }
futures-util = "0.3.30"
37 changes: 27 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,35 @@ The ilert agent comes in a single binary with a small footprint and helps you to

### Docker image

You can grab the latest release from [Docker hub](https://hub.docker.com/r/ilert/ilagent)
You can grab the latest official image from [Docker hub](https://hub.docker.com/r/ilert/ilagent)

```shell script
```sh
docker run ilert/ilagent
```

### Install script

For MacOS and Linux we also provide this one-liner to automatically install the agent:
### Compile the binary from source

> Note: default prebuild support stopped at version 0.3.0 if you cannot use the docker image or compile yourself and need new builds please open an issue
> Note: requires Rust to be installed, see https://rustup.rs
```shell script
curl -sL https://raw.githubusercontent.com/iLert/ilagent/master/install.sh | bash -
```sh
git clone [email protected]:iLert/ilagent.git
cd ilagent
cargo build --release
cd ./target/release
./ilagent --help
```

### Pre-build releases

> Note: default prebuild support stopped at version 0.3.0 if you cannot use the docker image or compile yourself and need new builds please open an issue
> Note: default prebuild support stopped at version 0.3.0 if you cannot use the docker image or cant compile yourself and need new builds please open an issue
#### Install script

For MacOS and Linux we also provide this one-liner to automatically install the agent:

```shell script
curl -sL https://raw.githubusercontent.com/iLert/ilagent/master/install.sh | bash -
```

We provide pre compiled binaries for every major OS on the [release page of this repository](https://github.com/iLert/ilagent/releases).

Expand Down Expand Up @@ -166,6 +176,13 @@ ilagent daemon -v -v \
--filter_val 'ALARM'
```

## Liveness probes

When providing the `-p 8977` port argument the agent will start its http server.
Providing both a `GET /ready` and a `GET /health` endpoint, these are currently static, but will be dynamic in the future.
Additionally, we recommend providing the `-b il1hbt123...` heartbeat argument with the integration key of a heartbeat alert source
to periodically ping the source.

## Getting help

We are happy to respond to [GitHub issues][issues] as well.
Expand All @@ -177,7 +194,7 @@ We are happy to respond to [GitHub issues][issues] as well.
### Cross-Compiling

Of course, you can also grab the source code and compile it yourself.
Requires cross (`cargo install cross`) to be installed.
Requires cross (`cargo install cross` for Apple Silicon support: `cargo install cross --git https://github.com/cross-rs/cross`) to be installed.

- Mac (or your host): `cargo build --release`
- Linux: `cross build --release --target x86_64-unknown-linux-gnu`
Expand Down
2 changes: 1 addition & 1 deletion examples/_daemon
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
cargo run -- daemon -p 8977 -v -v
cargo run -- daemon -p 8977 -v -v -b 123
12 changes: 12 additions & 0 deletions examples/_kafka_overwrite
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
cargo run -- daemon -v -v \
--kafka_brokers localhost:9092 --kafka_group_id ilagent -e 'test-topic' \
--event_key 'il1api123...' \
--map_key_alert_key 'mCode' \
--map_key_summary 'comment' \
--map_key_etype 'state' \
--map_val_etype_alert 'SET' \
--map_val_etype_accept 'ACK' \
--map_val_etype_resolve 'CLR' \
--filter_key 'type' \
--filter_val 'ALARM'
18 changes: 9 additions & 9 deletions examples/_mqtt_overwrite
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/bin/bash
cargo run -- daemon -v -v \
-m 127.0.0.1 -q 1883 -n ilagent -e '#' \
--mqtt_event_key 'il1api112115xxx' \
--mqtt_map_key_alert_key 'mCode' \
--mqtt_map_key_summary 'comment' \
--mqtt_map_key_etype 'state' \
--mqtt_map_val_etype_alert 'SET' \
--mqtt_map_val_etype_accept 'ACK' \
--mqtt_map_val_etype_resolve 'CLR' \
--mqtt_filter_key 'type' \
--mqtt_filter_val 'ALARM'
--event_key 'il1api123...' \
--map_key_alert_key 'mCode' \
--map_key_summary 'comment' \
--map_key_etype 'state' \
--map_val_etype_alert 'SET' \
--map_val_etype_accept 'ACK' \
--map_val_etype_resolve 'CLR' \
--filter_key 'type' \
--filter_val 'ALARM'
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl ILConfig {
http_host: "0.0.0.0".to_string(),
http_port: 8977,
start_http: false,
http_worker_count: 2,
http_worker_count: 1,
db_file: "./ilagent.db3".to_string(),
heartbeat_key: None,
mqtt_host: None,
Expand Down
17 changes: 8 additions & 9 deletions src/consumers/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@ impl ConsumerContext for CustomContext {

type LoggingConsumer = StreamConsumer<CustomContext>;

pub async fn run_kafka_job(daemon_context: Arc<DaemonContext>) -> () {
pub async fn run_kafka_job(daemon_ctx: Arc<DaemonContext>) -> () {

let (version_n, version_s) = get_rdkafka_version();
info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);

let context = CustomContext;

let event_topic = if let Some(topic) = daemon_context.config.clone().event_topic {
let event_topic = if let Some(topic) = daemon_ctx.config.clone().event_topic {
topic.clone()
} else {
"".to_string()
};

let heartbeat_topic = if let Some(topic) = daemon_context.config.clone().heartbeat_topic {
let heartbeat_topic = if let Some(topic) = daemon_ctx.config.clone().heartbeat_topic {
topic.clone()
} else {
"".to_string()
Expand All @@ -61,9 +59,10 @@ pub async fn run_kafka_job(daemon_context: Arc<DaemonContext>) -> () {
topics.push(heartbeat_topic.as_str());
}

let brokers = daemon_context.config.clone().kafka_brokers.expect("no broker");
let group_id = daemon_context.config.clone().kafka_group_id.expect("no group id");
let brokers = daemon_ctx.config.clone().kafka_brokers.expect("no broker");
let group_id = daemon_ctx.config.clone().kafka_group_id.expect("no group id");

let context = CustomContext;
let consumer: LoggingConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
Expand Down Expand Up @@ -114,9 +113,9 @@ pub async fn run_kafka_job(daemon_context: Arc<DaemonContext>) -> () {
} */

let should_retry: bool = if m.topic().eq(event_topic.as_str()) {
handle_event_message(daemon_context.clone(), message_key, payload, m.topic()).await
handle_event_message(daemon_ctx.clone(), message_key, payload, m.topic()).await
} else if m.topic().eq(heartbeat_topic.as_str()) {
handle_heartbeat_message(daemon_context.clone(), message_key, payload).await
handle_heartbeat_message(daemon_ctx.clone(), message_key, payload).await
} else {
warn!("Received Kafka message from unsubscribed topic: {}", m.topic());
// will commit these anyway
Expand Down
36 changes: 23 additions & 13 deletions src/consumers/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@ use log::{info, error};
use std::time::{Duration};
use rumqttc::{MqttOptions, Client, QoS, Incoming, Event};
use std::{str, thread};

use std::sync::Arc;
use std::sync::atomic::Ordering;
use ilert::ilert::ILert;

use crate::db::ILDatabase;
use crate::config::ILConfig;
use crate::hbt;
use crate::{hbt, DaemonContext};
use crate::models::event::EventQueueItemJson;

pub fn run_mqtt_job(config: &ILConfig) -> () {
pub fn run_mqtt_job(daemon_ctx: Arc<DaemonContext>) -> () {

let mut connected = false;
let mut recon_attempts = 0;

let db = ILDatabase::new(config.db_file.as_str());
let db = ILDatabase::new(daemon_ctx.config.db_file.as_str());

let mqtt_host = config.mqtt_host.clone().expect("Missing mqtt host");
let mqtt_port = config.mqtt_port.clone().expect("Missing mqtt port");
let mqtt_host = daemon_ctx.config.mqtt_host.clone().expect("Missing mqtt host");
let mqtt_port = daemon_ctx.config.mqtt_port.clone().expect("Missing mqtt port");
let mut mqtt_options = MqttOptions::new(
config.mqtt_name.clone().expect("Missing mqtt name"),
daemon_ctx.config.mqtt_name.clone().expect("Missing mqtt name"),
mqtt_host.as_str(),
mqtt_port,
);
Expand All @@ -30,16 +31,16 @@ pub fn run_mqtt_job(config: &ILConfig) -> () {
.set_pending_throttle(Duration::from_secs(1))
.set_clean_session(false);

if let Some(mqtt_username) = config.mqtt_username.clone() {
if let Some(mqtt_username) = daemon_ctx.config.mqtt_username.clone() {
mqtt_options.set_credentials(mqtt_username.as_str(),
config.mqtt_password.clone()
daemon_ctx.config.mqtt_password.clone()
.expect("mqtt_username is set, expecting mqtt_password to be set as well").as_str());
}

let (client, mut connection) = Client::new(mqtt_options, 10);

let event_topic = config.event_topic.clone().expect("Missing mqtt event topic");
let heartbeat_topic = config.heartbeat_topic.clone().expect("Missing mqtt heartbeat topic");
let event_topic = daemon_ctx.config.event_topic.clone().expect("Missing mqtt event topic");
let heartbeat_topic = daemon_ctx.config.heartbeat_topic.clone().expect("Missing mqtt heartbeat topic");

client.subscribe(event_topic.as_str(), QoS::AtMostOnce)
.expect("Failed to subscribe to mqtt event topic");
Expand All @@ -54,6 +55,10 @@ pub fn run_mqtt_job(config: &ILConfig) -> () {
info!("Connecting to Mqtt server..");
for (_i, invoke) in connection.iter().enumerate() {

if !daemon_ctx.running.load(Ordering::Relaxed) {
break;
}

match invoke {
Err(e) => {
error!("mqtt error {:?}", e);
Expand Down Expand Up @@ -85,19 +90,24 @@ pub fn run_mqtt_job(config: &ILConfig) -> () {
if heartbeat_topic == message.topic {
handle_heartbeat_message(payload);
} else if event_topic == message.topic {
handle_event_message(&config, &db, payload, message.topic.as_str());
handle_event_message(&daemon_ctx.config, &db, payload, message.topic.as_str());
} else {

// with filters event processing might subscribe to wildcards
if event_topic.contains("#") || event_topic.contains("+") {
handle_event_message(&config, &db, payload, message.topic.as_str());
handle_event_message(&daemon_ctx.config, &db, payload, message.topic.as_str());
}
}
},
_ => continue
}
}

// faster exits
if !daemon_ctx.running.load(Ordering::Relaxed) {
break;
}

// fallback, in case mqtt connection drops all the time
if recon_attempts < 300 {
recon_attempts = recon_attempts + 1;
Expand Down
11 changes: 6 additions & 5 deletions src/hbt.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
use std::sync::Arc;
use std::sync::atomic::Ordering;
use log::{error};
use std::time::{Duration, Instant};

use ilert::ilert::ILert;
use ilert::ilert_builders::{HeartbeatApiResource};
use crate::DaemonContext;

pub async fn run_hbt_job(daemon_context: Arc<DaemonContext>) -> () {
pub async fn run_hbt_job(daemon_ctx: Arc<DaemonContext>) -> () {

let mut last_run = Instant::now();

let api_key = daemon_context.config.clone().heartbeat_key
let api_key = daemon_ctx.config.clone().heartbeat_key
.expect("Failed to access heartbeat api key");
let api_key = api_key.as_str();

// kick off call
ping_heartbeat(&daemon_context.ilert_client, api_key).await;
ping_heartbeat(&daemon_ctx.ilert_client, api_key).await;

loop {
while daemon_ctx.running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(300)).await;

if last_run.elapsed().as_millis() < 30000 {
Expand All @@ -26,7 +27,7 @@ pub async fn run_hbt_job(daemon_context: Arc<DaemonContext>) -> () {
last_run = Instant::now();
}

ping_heartbeat(&daemon_context.ilert_client, api_key).await;
ping_heartbeat(&daemon_ctx.ilert_client, api_key).await;
}
}

Expand Down
27 changes: 22 additions & 5 deletions src/server.rs → src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ async fn get_index(_req: HttpRequest) -> impl Responder {
.body("ilagent/0.5.0")
}

async fn get_ready(_req: HttpRequest) -> impl Responder {
HttpResponse::NoContent().finish()
}

async fn get_health(_req: HttpRequest) -> impl Responder {
HttpResponse::NoContent().finish()
}

async fn get_heartbeat(container: web::Data<Mutex<WebContextContainer>>, _req: HttpRequest, path: web::Path<(String,)>) -> impl Responder {

let container = container.lock().await;
Expand Down Expand Up @@ -83,6 +91,14 @@ fn config_app(cfg: &mut web::ServiceConfig) {
.route(web::get().to(get_index)) // /
);

cfg.service(web::resource("/ready")
.route(web::get().to(get_ready))
);

cfg.service(web::resource("/health")
.route(web::get().to(get_health))
);

cfg.service(web::resource("/api/events")
.route(web::post().to(post_event)) // POST
);
Expand All @@ -93,18 +109,19 @@ fn config_app(cfg: &mut web::ServiceConfig) {
);
}

pub fn run_server(daemon_context: Arc<DaemonContext>) -> () {
let addr = daemon_context.config.get_http_bind_str().clone();
let db = ILDatabase::new(daemon_context.config.db_file.as_str());
pub async fn run_server(daemon_ctx: Arc<DaemonContext>) -> () {
let addr = daemon_ctx.config.get_http_bind_str().clone();
let db = ILDatabase::new(daemon_ctx.config.db_file.as_str());
let ilert_client = ILert::new().expect("failed to create ilert client");
info!("Starting HTTP server @ {}", addr);
let container = web::Data::new(Mutex::new(WebContextContainer{ db, ilert_client }));
let server = HttpServer::new(move|| App::new()
.app_data(container.clone())
.wrap(middleware::Logger::default())
.app_data(web::JsonConfig::default().limit(16000))
.configure(config_app))
.workers(daemon_context.config.http_worker_count.try_into().expect("Failed to get http worker count"))
.workers(daemon_ctx.config.http_worker_count.try_into().expect("Failed to get http worker count"))
.bind(addr.as_str())
.expect("Failed to bind to http port");
let _ = server.run();
let _ = server.run().await;
}
Loading

0 comments on commit fec5bf2

Please sign in to comment.