Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor improvements #17

Merged
merged 9 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
chrono = { version = "0.4.26", features = ["serde"] }
config = { version = "0.14.0", features = ["toml"] }
reqwest = { version = "0.12.3", features = ["json"] }
rumqttc = "0.23.0"
rumqttc = "0.24.0"
serde = { version = "1.0.183", features = ["derive"] }
serde_json = "1.0.107"
tokio = { version = "1.34.0", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ base_url = "https://portal-api.kredslob.dk"

# You can specify either the Id of the address or fully qualify the address
# [affaldvarme.address]
# id = "bc334dbe-250e-4cf1-af89-f3e940867845"
# id = "07514448_100_______"

#[affaldvarme.address]
# street_name = "Kongevejen"
Expand Down
218 changes: 153 additions & 65 deletions src/homeassistant/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::collections::HashMap;

use crate::mitaffald::Container;
use crate::settings::MQTTConfig;
use rumqttc::{AsyncClient, LastWill, MqttOptions};
use serde_json::json;

const HA_AVAILABILITY_TOPIC: &str = "garbage_bin/availability";
const HA_PAYLOAD_AVAILABLE: &str = "online";
const HA_PAYLOAD_NOT_AVAILABLE: &str = "offline";

impl From<MQTTConfig> for MqttOptions {
fn from(val: MQTTConfig) -> Self {
Expand All @@ -11,7 +16,7 @@
.set_credentials(val.username, val.password)
.set_last_will(LastWill::new(
HA_AVAILABILITY_TOPIC,
"offline",
HA_PAYLOAD_NOT_AVAILABLE,
rumqttc::QoS::AtLeastOnce,
true,
));
Expand All @@ -20,8 +25,109 @@
}
}

pub struct HASensor {
pub container_id: String,
pub struct CreatedState;
pub struct InitializedState {
sensors: HashMap<String, HASensor>,
}

pub struct HADevice<T> {
state: T,
}

impl Default for HADevice<CreatedState> {
fn default() -> Self {
HADevice {
state: CreatedState,
}
}
}

impl HADevice<CreatedState> {
pub async fn initialize(
mut self,
client: &mut AsyncClient,
) -> Result<HADevice<InitializedState>, String> {
self.register_device(client)
.await

Check warning on line 51 in src/homeassistant/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/homeassistant/mod.rs#L51

Added line #L51 was not covered by tests
.map_err(|e| e.to_string())?;

self.register_device_availability(client)
.await

Check warning on line 55 in src/homeassistant/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/homeassistant/mod.rs#L55

Added line #L55 was not covered by tests
.map_err(|e| e.to_string())?;

Ok(HADevice {
state: InitializedState {
sensors: HashMap::new(),
},
})
}

async fn register_device(
&mut self,
client: &mut AsyncClient,
) -> Result<(), rumqttc::ClientError> {
let payload = json!(
{
"unique_id": "ha_affaldvarme_device",
"name": "Affaldvarme Device",
"state_topic": HA_AVAILABILITY_TOPIC,
"availability_topic": HA_AVAILABILITY_TOPIC,
"payload_available": HA_PAYLOAD_AVAILABLE,
"payload_not_available": HA_PAYLOAD_NOT_AVAILABLE,
"device": {
"identifiers": ["ha_affaldvarme"],
"name": "Affaldvarme integration",
"sw_version": "1.0",
"model": "Standard",
"manufacturer": "Your humble rust developer"
}
}
);

client
.publish(
"homeassistant/sensor/ha_affaldvarme_device/config",
rumqttc::QoS::AtLeastOnce,
true,
serde_json::to_string(&payload).expect("Failed to serialize"),
)
.await

Check warning on line 94 in src/homeassistant/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/homeassistant/mod.rs#L94

Added line #L94 was not covered by tests
}

async fn register_device_availability(
&mut self,
client: &mut AsyncClient,
) -> Result<(), rumqttc::ClientError> {
client
.publish(
HA_AVAILABILITY_TOPIC,
rumqttc::QoS::AtLeastOnce,
true,
HA_PAYLOAD_AVAILABLE,
)
.await

Check warning on line 108 in src/homeassistant/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/homeassistant/mod.rs#L108

Added line #L108 was not covered by tests
}
}

impl HADevice<InitializedState> {
pub async fn report(
&mut self,
container: Container,
client: &mut AsyncClient,
) -> Result<(), String> {
let sensor_id = HASensor::generate_sensor_id(&container);
self.state
.sensors
.entry(sensor_id.clone())
.or_insert_with(|| HASensor::new(&container))
.report(container, client)
.await

Check warning on line 124 in src/homeassistant/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/homeassistant/mod.rs#L124

Added line #L124 was not covered by tests
.map_err(|e| e.to_string())
}
}

struct HASensor {
container_id: String,
configure_topic: String,
state_topic: String,
is_initialized: bool,
Expand Down Expand Up @@ -51,98 +157,80 @@
.collect()
}

pub async fn report(
async fn report(
&mut self,
container: Container,
client: &mut AsyncClient,
) -> Result<(), rumqttc::ClientError> {
if !self.is_initialized {
self.register_sensor(&container, client).await?;
self.register_sensor_availability(client).await?;
self.is_initialized = true;
}
self.register_sensor(&container, client).await?;

self.register_sensor_value(&container, client).await?;
Ok(())
self.register_sensor_value(&container, client).await
}

async fn register_sensor(
&mut self,
container: &Container,
client: &mut AsyncClient,
) -> Result<(), rumqttc::ClientError> {
let payload = format!(
r#"{{
"object_id": "ha_affaldvarme_{id}",
"unique_id": "ha_affaldvarme_{id}",
"name": "{sensor_name}",
"state_topic": "{state_topic}",
"json_attributes_topic": "{state_topic}",
"value_template": "{{{{ (strptime(value_json.next_empty, '%Y-%m-%d').date() - now().date()).days }}}}",
"availability_topic": "{availability_topic}",
"payload_available": "online",
"payload_not_available": "offline",
"unit_of_measurement": "days",
"device": {{
"identifiers": [
"ha_affaldvarme"
],
"name": "Affaldvarme integration",
"sw_version": "1.0",
"model": "Standard",
"manufacturer": "Your Garbage Bin Manufacturer"
}},
"icon": "mdi:recycle"
}}"#,
sensor_name = container.name,
state_topic = self.state_topic,
availability_topic = HA_AVAILABILITY_TOPIC,
id = self.container_id,
if self.is_initialized {
return Ok(());

Check warning on line 176 in src/homeassistant/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/homeassistant/mod.rs#L176

Added line #L176 was not covered by tests
}

let payload = json!(
{
"object_id": format!("ha_affaldvarme_{}", self.container_id),
"unique_id": format!("ha_affaldvarme_{}", self.container_id),
"name": container.name,
"state_topic": self.state_topic,
"json_attributes_topic": self.state_topic,
"value_template": "{{ (strptime(value_json.next_empty, '%Y-%m-%d').date() - now().date()).days }}",
"availability_topic": HA_AVAILABILITY_TOPIC,
"payload_available": HA_PAYLOAD_AVAILABLE,
"payload_not_available": HA_PAYLOAD_NOT_AVAILABLE,
"unit_of_measurement": "days",
"device": {
"identifiers": ["ha_affaldvarme"]
},
"icon": "mdi:recycle"
}
);

client
let publish_result = client
.publish(
&self.configure_topic,
rumqttc::QoS::AtLeastOnce,
false,
payload,
serde_json::to_string(&payload).expect("Failed to serialize"),
)
.await
}
.await;

Check warning on line 205 in src/homeassistant/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/homeassistant/mod.rs#L205

Added line #L205 was not covered by tests

async fn register_sensor_availability(
&self,
client: &mut AsyncClient,
) -> Result<(), rumqttc::ClientError> {
client
.publish(
HA_AVAILABILITY_TOPIC,
rumqttc::QoS::AtLeastOnce,
true,
"online",
)
.await
if publish_result.is_ok() {
self.is_initialized = true;
}

publish_result
}

async fn register_sensor_value(
&self,
container: &Container,
client: &mut AsyncClient,
) -> Result<(), rumqttc::ClientError> {
let payload = format!(
r#"
{{
"name": "{sensor_name}",
"next_empty": "{next_empty}",
"last_update": "{last_update}"
}}"#,
sensor_name = container.name,
next_empty = container.date.format("%Y-%m-%d"),
last_update = chrono::Local::now().to_rfc3339(),
let payload = json!(
{
"name": container.name,
"next_empty": container.date.format("%Y-%m-%d").to_string(),
"last_update": chrono::Local::now().to_rfc3339()
}
);

client
.publish(&self.state_topic, rumqttc::QoS::AtLeastOnce, false, payload)
.publish(
&self.state_topic,
rumqttc::QoS::AtLeastOnce,
false,
serde_json::to_string(&payload).expect("Failed to serialize"),
)
.await
}
}
38 changes: 18 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,49 @@
use std::collections::HashMap;

use homeassistant::HASensor;
use mitaffald::{get_containers, Container};
use rumqttc::AsyncClient;
use settings::Settings;
use std::collections::{hash_map::Entry, HashMap};

pub mod homeassistant;
pub mod mitaffald;
pub mod settings;

pub async fn sync_data(
settings: Settings,
sensor_map: &mut HashMap<String, HASensor>,
) -> Result<(), String> {
pub async fn sync_data(settings: Settings) -> Result<(), String> {
let (mut client, mut connection) = AsyncClient::new(settings.mqtt.into(), 200);
let mut has_errors = false;
let device = homeassistant::HADevice::default();

let mut device = device.initialize(&mut client).await?;

for container in get_containers(settings.affaldvarme)
let containers_to_report = get_containers(settings.affaldvarme)
.await?
.into_iter()
.fold(
HashMap::<String, Container>::new(),
|mut accumulator, item| {
match accumulator.entry(item.name.clone()) {
std::collections::hash_map::Entry::Occupied(mut existing) => {
Entry::Occupied(mut existing) => {
if existing.get().date > item.date {
existing.insert(item);
}
}
std::collections::hash_map::Entry::Vacant(v) => {
Entry::Vacant(v) => {
v.insert(item);
}
}

accumulator
},
)
.into_values()
{
let report_result = sensor_map
.entry(container.name.clone())
.or_insert_with(|| HASensor::new(&container))
.report(container, &mut client)
.await;
.into_values();

has_errors = has_errors || report_result.is_err();
}
let has_errors = {
let mut has_errors = false;
for container in containers_to_report {
let report_result = device.report(container, &mut client).await;

has_errors = has_errors || report_result.is_err();
}
has_errors
};

//calling disconnect() causes an error in the connection iterator
if let Err(x) = client.disconnect().await {
Expand Down
7 changes: 3 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use ha_mitaffald::homeassistant::HASensor;
use ha_mitaffald::settings::Settings;
use ha_mitaffald::sync_data;
use std::collections::HashMap;

#[tokio::main]
async fn main() {
let settings = Settings::new().expect("Failed to read settings");
let mut sensor_map: HashMap<String, HASensor> = HashMap::new();

let report = sync_data(settings, &mut sensor_map).await;
let report = sync_data(settings).await;

Check warning on line 8 in src/main.rs

View check run for this annotation

Codecov / codecov/patch

src/main.rs#L8

Added line #L8 was not covered by tests

if let Err(x) = report {
eprintln!(
"Failure while reporting data (some entities may have been updated): {}",
x
);
}

tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;

Check warning on line 17 in src/main.rs

View check run for this annotation

Codecov / codecov/patch

src/main.rs#L16-L17

Added lines #L16 - L17 were not covered by tests
}
Loading