From 428ce24eb170bfe0b9925a97c96c292f18c1d523 Mon Sep 17 00:00:00 2001 From: Matt Wrock Date: Mon, 1 Nov 2021 09:35:19 -0700 Subject: [PATCH] Revert rants replacement Signed-off-by: Matt Wrock --- Cargo.lock | 333 ++++++------------ .../src/tls/native_tls_wrapper/readers.rs | 2 +- components/hab/Cargo.toml | 2 +- components/hab/src/cli/hab/sup.rs | 12 +- components/sup/Cargo.toml | 3 +- components/sup/src/event.rs | 33 +- components/sup/src/event/error.rs | 28 +- .../sup/src/event/nats_message_stream.rs | 133 +++---- 8 files changed, 190 insertions(+), 356 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 404ca130ab..88776f56f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,23 +279,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" -[[package]] -name = "async-channel" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-task" -version = "4.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" - [[package]] name = "async-trait" version = "0.1.51" @@ -307,12 +290,6 @@ dependencies = [ "syn", ] -[[package]] -name = "atomic-waker" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" - [[package]] name = "atty" version = "0.2.14" @@ -357,15 +334,6 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" -[[package]] -name = "base64-url" -version = "1.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44265cf903f576fcaa1c2f23b32ec2dadaa8ec9d6b7c6212704d72a417bfbeef" -dependencies = [ - "base64", -] - [[package]] name = "bimap" version = "0.6.1" @@ -397,6 +365,18 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitvec" +version = "0.19.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "blake2b_simd" version = "0.5.11" @@ -417,20 +397,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" -dependencies = [ - "async-channel", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", - "once_cell", -] - [[package]] name = "broadcast" version = "0.1.0" @@ -464,12 +430,6 @@ dependencies = [ "bytes", ] -[[package]] -name = "cache-padded" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" - [[package]] name = "caps" version = "0.5.2" @@ -533,15 +493,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "370c83b49aedf022ee27942e8ae1d9de1cf40dc9653ee6550e4455d08f6406f9" -[[package]] -name = "concurrent-queue" -version = "1.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" -dependencies = [ - "cache-padded", -] - [[package]] name = "configopt" version = "0.1.0" @@ -669,25 +620,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "curve25519-dalek" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61" -dependencies = [ - "byteorder", - "digest", - "rand_core 0.5.1", - "subtle", - "zeroize", -] - -[[package]] -name = "data-encoding" -version = "2.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" - [[package]] name = "derivative" version = "2.2.0" @@ -795,18 +727,6 @@ dependencies = [ "signature", ] -[[package]] -name = "ed25519-dalek" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" -dependencies = [ - "curve25519-dalek", - "ed25519", - "sha2", - "zeroize", -] - [[package]] name = "either" version = "1.6.1" @@ -878,12 +798,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" -[[package]] -name = "event-listener" -version = "2.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" - [[package]] name = "failure" version = "0.1.8" @@ -906,15 +820,6 @@ dependencies = [ "synstructure", ] -[[package]] -name = "fastrand" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b394ed3d285a429378d3b384b9eb1285267e7df4b166df24b7a6939a04dc392e" -dependencies = [ - "instant", -] - [[package]] name = "field-offset" version = "0.3.4" @@ -1049,6 +954,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +[[package]] +name = "funty" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" + [[package]] name = "futures" version = "0.3.17" @@ -1097,21 +1008,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" -[[package]] -name = "futures-lite" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.17" @@ -1265,8 +1161,8 @@ dependencies = [ "lazy_static 1.4.0", "libc", "log 0.4.14", - "nats", "pbr", + "rants", "reqwest", "retry", "rustls", @@ -1637,8 +1533,6 @@ dependencies = [ "log 0.4.14", "log4rs", "mio 0.7.13", - "native-tls", - "nats", "notify", "num_cpus", "parking_lot", @@ -1649,6 +1543,7 @@ dependencies = [ "prost-derive", "prost-types", "rand 0.8.4", + "rants", "regex 1.5.4", "rustls", "serde", @@ -2025,6 +1920,19 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "lexical-core" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" +dependencies = [ + "arrayvec", + "bitflags", + "cfg-if 1.0.0", + "ryu", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.105" @@ -2309,36 +2217,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nats" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dddb0090aca6f217f525c1e20422b20881fc40baedfcd9f211fa3b08c3a1e1bb" -dependencies = [ - "base64", - "base64-url", - "blocking", - "chrono", - "crossbeam-channel", - "fastrand", - "itoa", - "json", - "libc", - "log 0.4.14", - "memchr", - "nkeys", - "nuid", - "once_cell", - "parking_lot", - "regex 1.5.4", - "rustls", - "rustls-native-certs", - "serde", - "serde_json", - "webpki 0.21.4", - "winapi 0.3.9", -] - [[package]] name = "net2" version = "0.2.37" @@ -2376,17 +2254,16 @@ dependencies = [ ] [[package]] -name = "nkeys" -version = "0.1.0" +name = "nom" +version = "6.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1a98f0a974ff737974b57ba1c71d2e0fe7ec18e5a828d4b8e02683171349dfa" +checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2" dependencies = [ - "byteorder", - "data-encoding", - "ed25519-dalek", - "log 0.4.14", - "rand 0.7.3", - "signatory", + "bitvec", + "funty", + "lexical-core", + "memchr", + "version_check", ] [[package]] @@ -2416,16 +2293,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "nuid" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7000c9392b545c4ba43e8abc086bf7d01cd2948690934c16980170b0549a2bd3" -dependencies = [ - "lazy_static 1.4.0", - "rand 0.8.4", -] - [[package]] name = "num-integer" version = "0.1.44" @@ -2548,12 +2415,6 @@ dependencies = [ "stable_deref_trait", ] -[[package]] -name = "parking" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" - [[package]] name = "parking_lot" version = "0.11.1" @@ -2919,6 +2780,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8" + [[package]] name = "rand" version = "0.3.23" @@ -3048,6 +2915,28 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rants" +version = "0.5.0" +source = "git+https://github.com/habitat-sh/rants.git#1c2c8674f48aab2166c0016eb10fcfc5ebcb6073" +dependencies = [ + "bytes", + "futures", + "log 0.4.14", + "native-tls", + "nom", + "owning_ref", + "pin-project", + "rand 0.8.4", + "serde", + "serde_json", + "tokio", + "tokio-native-tls", + "tokio-stream", + "tokio-util", + "uuid 0.8.2", +] + [[package]] name = "rcgen" version = "0.8.11" @@ -3330,18 +3219,6 @@ dependencies = [ "webpki 0.21.4", ] -[[package]] -name = "rustls-native-certs" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" -dependencies = [ - "openssl-probe", - "rustls", - "schannel", - "security-framework", -] - [[package]] name = "rustversion" version = "1.0.5" @@ -3588,18 +3465,6 @@ dependencies = [ "libc", ] -[[package]] -name = "signatory" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9eaebd4be561a7d8148803baa108092f85090189c4b8c3ffb81602b15b5c1771" -dependencies = [ - "getrandom 0.1.16", - "signature", - "subtle-encoding", - "zeroize", -] - [[package]] name = "signature" version = "1.3.1" @@ -3676,6 +3541,12 @@ dependencies = [ "loom", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stdweb" version = "0.4.20" @@ -3759,15 +3630,6 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" -[[package]] -name = "subtle-encoding" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dcb1ed7b8330c5eed5441052651dd7a12c75e2ed88f2ec024ae1fa3a5e59945" -dependencies = [ - "zeroize", -] - [[package]] name = "syn" version = "1.0.74" @@ -3800,6 +3662,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "tar" version = "0.4.36" @@ -4026,6 +3894,18 @@ dependencies = [ "webpki 0.21.4", ] +[[package]] +name = "tokio-stream" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-util" version = "0.6.7" @@ -4260,12 +4140,6 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "walkdir" version = "2.3.2" @@ -4512,6 +4386,12 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "wyz" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" + [[package]] name = "xattr" version = "0.2.2" @@ -4559,21 +4439,6 @@ name = "zeroize" version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "377db0846015f7ae377174787dd452e1c5f5a9050bc6f954911d01f116daa0cd" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2c1e130bebaeab2f23886bf9acbaca14b092408c452543c857f66399cd6dab1" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "synstructure", -] [[package]] name = "zmq" diff --git a/components/core/src/tls/native_tls_wrapper/readers.rs b/components/core/src/tls/native_tls_wrapper/readers.rs index 83514a6dc4..dfa5b21e35 100644 --- a/components/core/src/tls/native_tls_wrapper/readers.rs +++ b/components/core/src/tls/native_tls_wrapper/readers.rs @@ -58,7 +58,7 @@ pub fn certificates_as_der(fs_root_path: Option<&Path>) -> Result>> .collect::>()?) } -pub fn installed_cacerts(fs_root_path: Option<&Path>) -> Result> { +fn installed_cacerts(fs_root_path: Option<&Path>) -> Result> { let cacerts_ident = PackageIdent::from_str(CACERTS_PKG_IDENT)?; if let Ok(pkg_install) = PackageInstall::load(&cacerts_ident, fs_root_path) { diff --git a/components/hab/Cargo.toml b/components/hab/Cargo.toml index b92f7be49a..66db34c1ae 100644 --- a/components/hab/Cargo.toml +++ b/components/hab/Cargo.toml @@ -34,8 +34,8 @@ handlebars = { version = "0.29.1", default-features = false } lazy_static = "*" libc = "*" log = "*" -nats = "*" pbr = "*" +rants = { git = "https://github.com/habitat-sh/rants.git", features = ["native-tls"] } reqwest = { version = "*", features = ["blocking", "json", "stream"] } retry = { git = "https://github.com/habitat-sh/retry", features = ["asynchronous"] } rustls = "*" diff --git a/components/hab/src/cli/hab/sup.rs b/components/hab/src/cli/hab/sup.rs index 5b04dfe9d9..2364711252 100644 --- a/components/hab/src/cli/hab/sup.rs +++ b/components/hab/src/cli/hab/sup.rs @@ -32,10 +32,10 @@ use habitat_core::{env::Config, fs::HAB_CTL_KEYS_CACHE, package::PackageIdent, util as core_util}; - +use rants::{error::Error as RantsError, + Address as NatsAddress}; use std::{fmt, - net::{AddrParseError, - IpAddr, + net::{IpAddr, SocketAddr}, path::PathBuf, str::FromStr}; @@ -111,19 +111,19 @@ pub struct SupTerm {} // https://github.com/serde-rs/serde/issues/723. The easiest way to get around the issue is by // using a wrapper type since NatsAddress is not defined in this crate. #[derive(Deserialize, Serialize, Debug)] -pub struct EventStreamAddress(#[serde(with = "core_util::serde::string")] SocketAddr); +pub struct EventStreamAddress(#[serde(with = "core_util::serde::string")] NatsAddress); impl fmt::Display for EventStreamAddress { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } } impl FromStr for EventStreamAddress { - type Err = AddrParseError; + type Err = RantsError; fn from_str(s: &str) -> Result { Ok(EventStreamAddress(s.parse()?)) } } -impl From for SocketAddr { +impl From for NatsAddress { fn from(address: EventStreamAddress) -> Self { address.0 } } diff --git a/components/sup/Cargo.toml b/components/sup/Cargo.toml index afa34f82f2..672b62783f 100644 --- a/components/sup/Cargo.toml +++ b/components/sup/Cargo.toml @@ -38,8 +38,6 @@ lazy_static = "*" libc = "*" log = "*" log4rs = "*" -native-tls = "*" -nats = "*" notify = "*" num_cpus = "*" parking_lot = "*" @@ -49,6 +47,7 @@ prost = "*" prost-derive = "*" prost-types = "*" rand = "*" +rants = { git = "https://github.com/habitat-sh/rants.git", features = ["native-tls"] } regex = "*" rustls = "*" serde = { version = "*", features = ["rc"] } diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index 107dec982c..6364aaf422 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -39,17 +39,26 @@ use habitat_core::{package::ident::PackageIdent, use nats_message_stream::{NatsMessage, NatsMessageStream}; use prost_types::Duration as ProstDuration; +use rants::{Address, + Subject}; use state::Storage; use std::{net::SocketAddr, time::Duration}; -// NATS subject names -const SERVICE_STARTED_SUBJECT: &str = "habitat.event.service_started"; -const SERVICE_STOPPED_SUBJECT: &str = "habitat.event.service_stopped"; -const SERVICE_UPDATE_STARTED_SUBJECT: &str = "habitat.event.service_update_started"; -const HEALTHCHECK_SUBJECT: &str = "habitat.event.healthcheck"; - lazy_static! { + // TODO (CM): When const fn support lands in stable, we can ditch + // this lazy_static call. + + // NATS subject names + static ref SERVICE_STARTED_SUBJECT: Subject = + "habitat.event.service_started".parse().expect("valid NATS subject"); + static ref SERVICE_STOPPED_SUBJECT: Subject = + "habitat.event.service_stopped".parse().expect("valid NATS subject"); + static ref SERVICE_UPDATE_STARTED_SUBJECT: Subject = + "habitat.event.service_update_started".parse().expect("valid NATS subject"); + static ref HEALTHCHECK_SUBJECT: Subject = + "habitat.event.healthcheck".parse().expect("valid NATS subject"); + /// Reference to the event stream. static ref NATS_MESSAGE_STREAM: Storage = Storage::new(); /// Core information that is shared between all events. @@ -84,7 +93,7 @@ pub struct EventStreamConfig { pub site: Option, pub meta: EventStreamMetadata, pub token: EventStreamToken, - pub url: SocketAddr, + pub url: Address, pub connect_method: EventStreamConnectMethod, pub server_certificate: Option, } @@ -92,7 +101,7 @@ pub struct EventStreamConfig { /// Send an event for the start of a Service. pub fn service_started(service: &Service) { if initialized() { - publish(SERVICE_STARTED_SUBJECT, + publish(&SERVICE_STARTED_SUBJECT, ServiceStartedEvent { service_metadata: Some(service.to_service_metadata()), event_metadata: None, }); } @@ -101,7 +110,7 @@ pub fn service_started(service: &Service) { /// Send an event for the stop of a Service. pub fn service_stopped(service: &Service) { if initialized() { - publish(SERVICE_STOPPED_SUBJECT, + publish(&SERVICE_STOPPED_SUBJECT, ServiceStoppedEvent { service_metadata: Some(service.to_service_metadata()), event_metadata: None, }); } @@ -110,7 +119,7 @@ pub fn service_stopped(service: &Service) { /// Send an event at the start of a Service update. pub fn service_update_started(service: &Service, update: &PackageIdent) { if initialized() { - publish(SERVICE_UPDATE_STARTED_SUBJECT, + publish(&SERVICE_UPDATE_STARTED_SUBJECT, ServiceUpdateStartedEvent { event_metadata: None, service_metadata: Some(service.to_service_metadata()), @@ -137,7 +146,7 @@ pub fn health_check(metadata: ServiceMetadata, let prost_interval = ProstDuration::from(Duration::from(health_check_interval)); - publish(HEALTHCHECK_SUBJECT, + publish(&HEALTHCHECK_SUBJECT, HealthCheckEvent { service_metadata: Some(metadata), event_metadata: None, result: i32::from(health_check_result), @@ -198,7 +207,7 @@ fn initialized() -> bool { NATS_MESSAGE_STREAM.try_get().is_some() } /// /// If `init_stream` has not been called already, this function will /// be a no-op. -fn publish(subject: &'static str, mut event: impl EventMessage) { +fn publish(subject: &'static Subject, mut event: impl EventMessage) { if let Some(stream) = NATS_MESSAGE_STREAM.try_get() { // TODO (CM): Yeah... this is looking pretty gross. The // intention is to be able to timestamp the events right as diff --git a/components/sup/src/event/error.rs b/components/sup/src/event/error.rs index 5f89766cf0..8340de97cd 100644 --- a/components/sup/src/event/error.rs +++ b/components/sup/src/event/error.rs @@ -1,21 +1,19 @@ //! Event subsystem-specific error handling -use native_tls; +use rants::{error::Error as RantsError, + native_tls}; use std::{error, fmt, - io, result}; -use tokio::time::error::Elapsed; pub type Result = result::Result; #[derive(Debug)] pub enum Error { - NotConnected, - PublishFailed(io::Error), - ConnectTimeout(Elapsed), + ConnectNatsServer, HabitatCore(habitat_core::Error), NativeTls(native_tls::Error), + Rants(RantsError), } // TODO (CM): I would have like to have derived Fail on our Error @@ -29,11 +27,10 @@ pub enum Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Error::NotConnected => "Could not establish connection to NATS server".fmt(f), - Error::PublishFailed(e) => format!("{}", e).fmt(f), - Error::ConnectTimeout(e) => format!("{}", e).fmt(f), + Error::ConnectNatsServer => "Could not establish connection to NATS server".fmt(f), Error::HabitatCore(_) => "{}".fmt(f), Error::NativeTls(e) => format!("{}", e).fmt(f), + Error::Rants(e) => format!("{}", e).fmt(f), } } } @@ -41,23 +38,22 @@ impl fmt::Display for Error { impl error::Error for Error { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { - Error::NotConnected => None, - Error::PublishFailed(ref e) => Some(e), - Error::ConnectTimeout(ref e) => Some(e), + Error::ConnectNatsServer => None, Error::HabitatCore(ref e) => Some(e), + Error::Rants(ref e) => Some(e), Error::NativeTls(ref e) => Some(e), } } } -impl From for Error { - fn from(error: Elapsed) -> Self { Error::ConnectTimeout(error) } -} - impl From for Error { fn from(error: habitat_core::Error) -> Self { Error::HabitatCore(error) } } +impl From for Error { + fn from(error: RantsError) -> Self { Error::Rants(error) } +} + impl From for Error { fn from(error: native_tls::Error) -> Self { Error::NativeTls(error) } } diff --git a/components/sup/src/event/nats_message_stream.rs b/components/sup/src/event/nats_message_stream.rs index 1009a1d68c..44c5fbd9d1 100644 --- a/components/sup/src/event/nats_message_stream.rs +++ b/components/sup/src/event/nats_message_stream.rs @@ -4,94 +4,25 @@ use crate::event::{Error, use futures::{channel::{mpsc as futures_mpsc, mpsc::UnboundedSender}, stream::StreamExt}; -use nats::{self, - asynk::{Connection, - Options}}; -use std::{path::PathBuf, - sync::Arc, - thread, - time::Duration}; -use tokio::{sync::Mutex, - time}; +use rants::{error::Error as RantsError, + native_tls::TlsConnector, + Client, + Subject}; +use tokio::time; /// The subject and payload of a NATS message. #[derive(Debug)] pub struct NatsMessage { - subject: &'static str, + subject: &'static Subject, payload: Vec, } impl NatsMessage { - pub fn new(subject: &'static str, payload: Vec) -> Self { NatsMessage { subject, payload } } - - pub fn payload(&self) -> &[u8] { self.payload.as_slice() } -} - -/// NatsClient is main accessor to connect to NATS Server and -/// to publish messages to NATS. -#[derive(Clone)] -struct NatsClient(Arc>>); - -impl NatsClient { - fn new() -> NatsClient { NatsClient(Arc::new(Mutex::new(None))) } - - // Connect to the server. If a timeout was set, we want to ensure we establish a connection - // before exiting the function. If we do not connect within the timeout we return an error. - // If we do not have a timeout, we don't care if we can immediately connect. Instead we spawn - // a future that will resolve when a connection is possible. Once we establish a - // connection, this client will handle reconnecting if necessary. - async fn connect(self, supervisor_id: String, config: EventStreamConfig) -> Result<()> { - match config.connect_method.into() { - Some(timeout) => { - time::timeout(timeout, self.connect_impl(supervisor_id, &config)).await??; - } - None => { - tokio::spawn(async move { self.connect_impl(supervisor_id, &config).await }); - } - } - Ok(()) - } - - async fn connect_impl(self, supervisor_id: String, config: &EventStreamConfig) -> Result<()> { - while self.0.lock().await.is_none() { - match Self::options_from_config(&supervisor_id, config)?.connect(&config.url - .to_string()) - .await - { - Ok(conn) => *self.0.lock().await = Some(conn), - Err(e) => { - trace!("Failed to connect to NATS server: {}", e); - thread::sleep(Duration::from_millis(1000)); - } - } - } - Ok(()) + pub fn new(subject: &'static Subject, payload: Vec) -> Self { + NatsMessage { subject, payload } } - fn options_from_config(supervisor_id: &str, config: &EventStreamConfig) -> Result { - let name = format!("hab_client_{}", supervisor_id); - let ca_certs = habitat_core::tls::native_tls_wrapper::installed_cacerts(None)?; - let mut options = Options::with_token(&config.token.to_string()) - .with_name(&name) - .add_root_certificate(ca_certs.expect("No core/cacerts installed")) - .max_reconnects(None); - - if let Some(ref cert_path) = config.server_certificate { - let cert_path: PathBuf = cert_path.clone().into(); - options = options.add_root_certificate(cert_path); - } - Ok(options) - } - - async fn publish(&self, subject: &str, msg: impl AsRef<[u8]>) -> Result<()> { - if let Some(conn) = &*self.0.lock().await { - conn.publish(subject, msg) - .await - .map_err(Error::PublishFailed) - } else { - Err(Error::NotConnected) - } - } + pub fn payload(&self) -> &[u8] { self.payload.as_slice() } } /// A lightweight handle for the NATS message stream. All events are converted into a NatsMessage @@ -103,10 +34,44 @@ pub struct NatsMessageStream(pub(super) UnboundedSender); impl NatsMessageStream { pub async fn new(supervisor_id: &str, config: EventStreamConfig) -> Result { - let client = NatsClient::new(); - client.clone() - .connect(supervisor_id.to_string(), config) - .await?; + let EventStreamConfig { url, + token, + connect_method, + server_certificate, + .. } = config; + + let mut client = Client::new(vec![url]); + + // Configure the client connect message + client.connect_mut() + .await + .name(format!("hab_client_{}", supervisor_id)) + .verbose(true) + .token(token.to_string()); + + // Configure the tls connector + let mut tls_connector = TlsConnector::builder(); + for certificate in habitat_core::tls::native_tls_wrapper::certificates(None)? { + tls_connector.add_root_certificate(certificate); + } + if let Some(certificate) = server_certificate { + tls_connector.add_root_certificate(certificate.into()); + } + let tls_connector = tls_connector.build()?; + client.set_tls_config(tls_connector).await; + + // Connect to the server. If a timeout was set, we want to ensure we establish a connection + // before exiting the function. If we do not connect within the timeout we return an error. + // If we do not have a timeout, we dont care if we can immediately connect. Instead we spawn + // a future that will resolve when a connection is possible. Once we establish a + // connection, the client will handle reconnecting if necessary. + if let Some(timeout) = connect_method.into() { + time::timeout(timeout, client.connect()).await + .map_err(|_| Error::ConnectNatsServer)?; + } else { + let client = Client::clone(&client); + tokio::spawn(async move { client.connect().await }); + } let (tx, mut rx) = futures_mpsc::unbounded::(); @@ -117,7 +82,7 @@ impl NatsMessageStream { // We do not retry any messages. If we are not connected when the message is // processed or there is an error in publishing the message, the message will // never be sent. - if let Error::NotConnected = e { + if let RantsError::NotConnected = e { trace!("Failed to publish message to subject '{}' because the client is \ not connected", packet.subject); @@ -134,7 +99,7 @@ impl NatsMessageStream { /// Queues a NATS message to be published pub fn send(&self, event_packet: NatsMessage) { - trace!("Queueing message: {:?}", event_packet.subject); + trace!("Queueing message: {:?}", event_packet); if let Err(e) = self.0.unbounded_send(event_packet) { error!("Failed to queue message, err: {}", e); }