Skip to content

Commit

Permalink
Merge pull request #2915 from jarhodes314/feat/built-in-bridge-reconn…
Browse files Browse the repository at this point in the history
…ect-backoff

feat: Add reconnection backoff to built-in bridge
  • Loading branch information
jarhodes314 authored Jun 7, 2024
2 parents 4861be8 + e236d50 commit 569bccb
Show file tree
Hide file tree
Showing 16 changed files with 365 additions and 83 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ glob = "0.3"
heck = "0.4.1"
http = "0.2"
http-body = "0.4"
humantime = "2.1.0"
hyper = { version = "0.14", default-features = false }
hyper-rustls = { version = "0.24", default_features = false, features = [
"tokio-runtime",
Expand Down
1 change: 1 addition & 0 deletions crates/common/tedge_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ camino = { workspace = true, features = ["serde", "serde1"] }
certificate = { workspace = true }
doku = { workspace = true }
figment = { workspace = true, features = ["env", "toml"] }
humantime = { workspace = true }
mqtt_channel = { workspace = true }
once_cell = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls-native-roots"] }
Expand Down
142 changes: 97 additions & 45 deletions crates/common/tedge_config/src/tedge_config_cli/models/seconds.rs
Original file line number Diff line number Diff line change
@@ -1,89 +1,141 @@
use std::convert::TryFrom;
use std::convert::TryInto;
use std::fmt;
use std::fmt::Debug;
use std::str::FromStr;
use std::time::Duration;

#[derive(
Copy, Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq, doku::Document,
)]
#[serde(transparent)]
pub struct Seconds(pub(crate) u64);
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
#[serde(into = "String", try_from = "DeserializeTime")]
pub struct SecondsOrHumanTime {
duration: Duration,
input: DeserializeTime,
}

#[derive(thiserror::Error, Debug)]
#[error("Invalid seconds number: '{input}'.")]
pub struct InvalidSecondsNumber {
input: String,
}

impl TryFrom<String> for Seconds {
type Error = InvalidSecondsNumber;

fn try_from(input: String) -> Result<Self, Self::Error> {
input
.as_str()
.parse::<u64>()
.map_err(|_| InvalidSecondsNumber { input })
.map(Seconds)
impl From<SecondsOrHumanTime> for String {
fn from(value: SecondsOrHumanTime) -> Self {
value.to_string()
}
}

impl TryInto<String> for Seconds {
type Error = std::convert::Infallible;
impl FromStr for SecondsOrHumanTime {
type Err = humantime::DurationError;

fn from_str(input: &str) -> Result<Self, Self::Err> {
let seconds = input.parse::<u64>();

fn try_into(self) -> Result<String, Self::Error> {
Ok(format!("{}", self.0))
match seconds {
Ok(seconds) => Ok(Self {
duration: Duration::from_secs(seconds),
input: DeserializeTime::Seconds(seconds),
}),
Err(_) => humantime::parse_duration(input).map(|duration| Self {
duration,
input: DeserializeTime::MaybeHumanTime(input.to_owned()),
}),
}
}
}

impl FromStr for Seconds {
type Err = <u64 as FromStr>::Err;

fn from_str(s: &str) -> Result<Self, Self::Err> {
u64::from_str(s).map(Self)
impl fmt::Display for SecondsOrHumanTime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.input, f)
}
}

impl fmt::Display for Seconds {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
impl SecondsOrHumanTime {
pub fn duration(&self) -> Duration {
self.duration
}
}

impl Seconds {
pub fn duration(self) -> Duration {
Duration::from_secs(self.0)
impl doku::Document for SecondsOrHumanTime {
fn ty() -> doku::Type {
String::ty()
}
}

impl From<Seconds> for u64 {
fn from(val: Seconds) -> Self {
val.0
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
#[serde(untagged)]
enum DeserializeTime {
Seconds(u64),
MaybeHumanTime(String),
}

impl fmt::Display for DeserializeTime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DeserializeTime::Seconds(secs) => fmt::Display::fmt(&secs, f),
DeserializeTime::MaybeHumanTime(input) => fmt::Display::fmt(&input, f),
}
}
}

impl From<u64> for Seconds {
fn from(value: u64) -> Self {
Self(value)
impl TryFrom<DeserializeTime> for SecondsOrHumanTime {
type Error = humantime::DurationError;
fn try_from(value: DeserializeTime) -> Result<Self, Self::Error> {
match value {
DeserializeTime::Seconds(secs) => Ok(Self {
duration: Duration::from_secs(secs),
input: value,
}),
DeserializeTime::MaybeHumanTime(human) => human.parse(),
}
}
}

#[cfg(test)]
use assert_matches::*;
#[test]
fn conversion_from_valid_seconds_succeeds() {
assert_matches!(Seconds::try_from("1234".to_string()), Ok(Seconds(1234)));
assert_eq!(
"1234".parse::<SecondsOrHumanTime>().unwrap(),
SecondsOrHumanTime {
duration: Duration::from_secs(1234),
input: DeserializeTime::Seconds(1234),
}
);
}

#[test]
fn conversion_from_valid_humantime_succeeds() {
assert_eq!(
"1 hour".parse::<SecondsOrHumanTime>().unwrap(),
SecondsOrHumanTime {
duration: Duration::from_secs(3600),
input: DeserializeTime::MaybeHumanTime("1 hour".into()),
}
);
}

#[test]
fn conversion_from_longer_integer_fails() {
assert_matches!(
Seconds::try_from("18446744073709551616".to_string()),
Err(InvalidSecondsNumber { .. })
assert_eq!(
"18446744073709551616"
.parse::<SecondsOrHumanTime>()
.unwrap_err()
.to_string(),
"number is too large"
);
}

#[test]
fn conversion_from_seconds_to_string() {
assert_matches!(TryInto::<String>::try_into(Seconds(1234)), Ok(seconds_str) if seconds_str == "1234");
fn display_implementation_preserves_format_of_seconds() {
assert_eq!(
"1234".parse::<SecondsOrHumanTime>().unwrap().to_string(),
"1234"
);
}

#[test]
fn display_implementation_preserves_format_of_humantime() {
assert_eq!(
"20 minutes 34s"
.parse::<SecondsOrHumanTime>()
.unwrap()
.to_string(),
"20 minutes 34s"
);
}
44 changes: 20 additions & 24 deletions crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::AutoFlag;
use crate::AutoLogUpload;
use crate::ConnectUrl;
use crate::HostPort;
use crate::Seconds;
use crate::SecondsOrHumanTime;
use crate::SoftwareManagementApiFlag;
use crate::TEdgeConfigLocation;
use crate::TemplatesSet;
Expand Down Expand Up @@ -465,7 +465,6 @@ define_tedge_config! {
/// if this is set to "c8y", then messages published to `c8y/s/us` will be
/// forwarded by to Cumulocity on the `s/us` topic
#[tedge_config(example = "c8y", default(function = "c8y_topic_prefix"))]
#[doku(skip)] // Hide the configuration in `tedge config list --doc`
topic_prefix: TopicPrefix,
},

Expand Down Expand Up @@ -661,8 +660,21 @@ define_tedge_config! {

bridge: {
#[tedge_config(default(value = false))]
#[doku(skip)] // Hide the configuration in `tedge config list --doc`
built_in: bool,

reconnect_policy: {
/// The minimum time the built-in bridge will wait before reconnecting
#[tedge_config(example = "30s", default(from_str = "30s"))]
initial_interval: SecondsOrHumanTime,

/// The maximum time the built-in bridge will wait before reconnecting
#[tedge_config(example = "10m", default(from_str = "10m"))]
maximum_interval: SecondsOrHumanTime,

/// How long to wait after successful reconnection before resetting the reconnect timeout
#[tedge_config(example = "5m", default(from_str = "5m"))]
reset_window: SecondsOrHumanTime,
},
},
},

Expand Down Expand Up @@ -773,9 +785,9 @@ define_tedge_config! {
#[tedge_config(example = "true", default(value = true))]
lock_files: bool,

/// Interval at which the memory usage is logged (in seconds). Logging is disabled if set to 0
#[tedge_config(example = "60", default(value = 0_u64))]
log_memory_interval: Seconds,
/// Interval at which the memory usage is logged (in seconds if no unit is provided). Logging is disabled if set to 0
#[tedge_config(example = "60s", default(from_str = "0"))]
log_memory_interval: SecondsOrHumanTime,
},

logs: {
Expand Down Expand Up @@ -803,8 +815,8 @@ define_tedge_config! {
child: {
update: {
/// The timeout limit in seconds for firmware update operations on child devices
#[tedge_config(example = "3600", default(value = 3600_u64))]
timeout: Seconds,
#[tedge_config(example = "1h", default(from_str = "1h"))]
timeout: SecondsOrHumanTime,
}
}
},
Expand Down Expand Up @@ -854,22 +866,6 @@ fn c8y_topic_prefix() -> TopicPrefix {
TopicPrefix::try_new("c8y").unwrap()
}

impl ReadableKey {
// This is designed to be a simple way of controlling whether values appear in the output of
// `tedge config list`. Ideally this would be integrated into [define_tedge_config], see
// https://github.com/thin-edge/thin-edge.io/issues/2767 for more detail on that.
// Currently this accompanies `#[doku(skip)]` on the relevant configurations, which hides
// them in `tedge config list --doc`. The configurations are hidden to avoid unfinished
// features from being discovered.
pub fn is_printable_value(self, value: &str) -> bool {
match self {
Self::MqttBridgeBuiltIn => value != "false",
Self::C8yBridgeTopicPrefix => value != "c8y",
_ => true,
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, serde::Serialize)]
#[serde(try_from = "Cow<'_, str>", into = "Arc<str>")]
/// A valid MQTT topic prefix, used to customise the c8y/ topic prefix
Expand Down
1 change: 1 addition & 0 deletions crates/common/tedge_config_macros/impl/src/input/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub enum FieldDefault {
FromKey(Punctuated<syn::Ident, syn::Token![.]>),
FromOptionalKey(Punctuated<syn::Ident, syn::Token![.]>),
Value(DefaultValueLit),
FromStr(syn::LitStr),
None,
}

Expand Down
29 changes: 29 additions & 0 deletions crates/common/tedge_config_macros/impl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This crate implements the macro for `tedge_config_macros` and should not be used directly.

use crate::input::FieldDefault;
use heck::ToUpperCamelCase;
use optional_error::OptionalError;
use proc_macro2::Span;
Expand Down Expand Up @@ -61,6 +62,33 @@ pub fn generate_configuration(tokens: TokenStream) -> Result<TokenStream, syn::E
})
.collect::<Vec<_>>();

let fromstr_default_tests = fields_with_keys
.iter()
.filter_map(|(key, field)| Some((key, field.read_write()?)))
.filter_map(|(key, field)| {
let ty = field.from.as_ref().unwrap_or(&field.ty);
if let FieldDefault::FromStr(default) = &field.default {
let name = quote::format_ident!(
"default_value_can_be_deserialized_for_{}",
key.join("_").replace('-', "_")
);
let span = default.span();
let expect_message = format!(
"Default value {default:?} for '{}' could not be deserialized",
key.join("."),
);
Some(quote_spanned! {span=>
#[test]
fn #name() {
#default.parse::<#ty>().expect(#expect_message);
}
})
} else {
None
}
})
.collect::<Vec<_>>();

let reader_name = proc_macro2::Ident::new("TEdgeConfigReader", Span::call_site());
let dto_doc_comment = format!(
"A data-transfer object, designed for reading and writing to
Expand Down Expand Up @@ -100,6 +128,7 @@ pub fn generate_configuration(tokens: TokenStream) -> Result<TokenStream, syn::E

Ok(quote! {
#(#example_tests)*
#(#fromstr_default_tests)*
#dto
#reader
#enums
Expand Down
13 changes: 5 additions & 8 deletions crates/common/tedge_config_macros/impl/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,27 +303,24 @@ fn generate_string_readers(paths: &[VecDeque<&FieldOrGroup>]) -> TokenStream {
.field()
.expect("Back of path is guaranteed to be a field");
let segments = path.iter().map(|thing| thing.ident());
let to_string = quote_spanned!(field.ty().span()=> .to_string());
if field.read_only().is_some() {
if extract_type_from_result(field.ty()).is_some() {
parse_quote! {
// Probably where the compiler error appears
// TODO why do we need to unwrap
ReadableKey::#variant_name => Ok(self.#(#segments).*.try_read(self)?.to_string()),
ReadableKey::#variant_name => Ok(self.#(#segments).*.try_read(self)?#to_string),
}
} else {
parse_quote! {
// Probably where the compiler error appears
// TODO why do we need to unwrap
ReadableKey::#variant_name => Ok(self.#(#segments).*.read(self).to_string()),
ReadableKey::#variant_name => Ok(self.#(#segments).*.read(self)#to_string),
}
}
} else if field.has_guaranteed_default() {
parse_quote! {
ReadableKey::#variant_name => Ok(self.#(#segments).*.to_string()),
ReadableKey::#variant_name => Ok(self.#(#segments).*#to_string),
}
} else {
parse_quote! {
ReadableKey::#variant_name => Ok(self.#(#segments).*.or_config_not_set()?.to_string()),
ReadableKey::#variant_name => Ok(self.#(#segments).*.or_config_not_set()?#to_string),
}
}
});
Expand Down
Loading

0 comments on commit 569bccb

Please sign in to comment.