Skip to content

Commit

Permalink
Merge pull request #8353 from habitat-sh/revert-8293-cjp_rants_client…
Browse files Browse the repository at this point in the history
…_port

Revert "replace rants dependency with nats.io crate"
  • Loading branch information
mwrock authored Nov 1, 2021
2 parents 12f6935 + 428ce24 commit 594e1c8
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 356 deletions.
333 changes: 99 additions & 234 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion components/core/src/tls/native_tls_wrapper/readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn certificates_as_der(fs_root_path: Option<&Path>) -> Result<Vec<Vec<u8>>>
.collect::<StdResult<_, _>>()?)
}

pub fn installed_cacerts(fs_root_path: Option<&Path>) -> Result<Option<PathBuf>> {
fn installed_cacerts(fs_root_path: Option<&Path>) -> Result<Option<PathBuf>> {
let cacerts_ident = PackageIdent::from_str(CACERTS_PKG_IDENT)?;

if let Ok(pkg_install) = PackageInstall::load(&cacerts_ident, fs_root_path) {
Expand Down
2 changes: 1 addition & 1 deletion components/hab/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ handlebars = { version = "0.29.1", default-features = false }
lazy_static = "*"
libc = "*"
log = "*"
nats = "*"
pbr = "*"
rants = { git = "https://github.com/habitat-sh/rants.git", features = ["native-tls"] }
reqwest = { version = "*", features = ["blocking", "json", "stream"] }
retry = { git = "https://github.com/habitat-sh/retry", features = ["asynchronous"] }
rustls = "*"
Expand Down
12 changes: 6 additions & 6 deletions components/hab/src/cli/hab/sup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ use habitat_core::{env::Config,
fs::HAB_CTL_KEYS_CACHE,
package::PackageIdent,
util as core_util};

use rants::{error::Error as RantsError,
Address as NatsAddress};
use std::{fmt,
net::{AddrParseError,
IpAddr,
net::{IpAddr,
SocketAddr},
path::PathBuf,
str::FromStr};
Expand Down Expand Up @@ -111,19 +111,19 @@ pub struct SupTerm {}
// https://github.com/serde-rs/serde/issues/723. The easiest way to get around the issue is by
// using a wrapper type since NatsAddress is not defined in this crate.
#[derive(Deserialize, Serialize, Debug)]
pub struct EventStreamAddress(#[serde(with = "core_util::serde::string")] SocketAddr);
pub struct EventStreamAddress(#[serde(with = "core_util::serde::string")] NatsAddress);

impl fmt::Display for EventStreamAddress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) }
}

impl FromStr for EventStreamAddress {
type Err = AddrParseError;
type Err = RantsError;

fn from_str(s: &str) -> Result<Self, Self::Err> { Ok(EventStreamAddress(s.parse()?)) }
}

impl From<EventStreamAddress> for SocketAddr {
impl From<EventStreamAddress> for NatsAddress {
fn from(address: EventStreamAddress) -> Self { address.0 }
}

Expand Down
3 changes: 1 addition & 2 deletions components/sup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ lazy_static = "*"
libc = "*"
log = "*"
log4rs = "*"
native-tls = "*"
nats = "*"
notify = "*"
num_cpus = "*"
parking_lot = "*"
Expand All @@ -49,6 +47,7 @@ prost = "*"
prost-derive = "*"
prost-types = "*"
rand = "*"
rants = { git = "https://github.com/habitat-sh/rants.git", features = ["native-tls"] }
regex = "*"
rustls = "*"
serde = { version = "*", features = ["rc"] }
Expand Down
33 changes: 21 additions & 12 deletions components/sup/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,26 @@ use habitat_core::{package::ident::PackageIdent,
use nats_message_stream::{NatsMessage,
NatsMessageStream};
use prost_types::Duration as ProstDuration;
use rants::{Address,
Subject};
use state::Storage;
use std::{net::SocketAddr,
time::Duration};

// NATS subject names
const SERVICE_STARTED_SUBJECT: &str = "habitat.event.service_started";
const SERVICE_STOPPED_SUBJECT: &str = "habitat.event.service_stopped";
const SERVICE_UPDATE_STARTED_SUBJECT: &str = "habitat.event.service_update_started";
const HEALTHCHECK_SUBJECT: &str = "habitat.event.healthcheck";

lazy_static! {
// TODO (CM): When const fn support lands in stable, we can ditch
// this lazy_static call.

// NATS subject names
static ref SERVICE_STARTED_SUBJECT: Subject =
"habitat.event.service_started".parse().expect("valid NATS subject");
static ref SERVICE_STOPPED_SUBJECT: Subject =
"habitat.event.service_stopped".parse().expect("valid NATS subject");
static ref SERVICE_UPDATE_STARTED_SUBJECT: Subject =
"habitat.event.service_update_started".parse().expect("valid NATS subject");
static ref HEALTHCHECK_SUBJECT: Subject =
"habitat.event.healthcheck".parse().expect("valid NATS subject");

/// Reference to the event stream.
static ref NATS_MESSAGE_STREAM: Storage<NatsMessageStream> = Storage::new();
/// Core information that is shared between all events.
Expand Down Expand Up @@ -84,15 +93,15 @@ pub struct EventStreamConfig {
pub site: Option<String>,
pub meta: EventStreamMetadata,
pub token: EventStreamToken,
pub url: SocketAddr,
pub url: Address,
pub connect_method: EventStreamConnectMethod,
pub server_certificate: Option<EventStreamServerCertificate>,
}

/// Send an event for the start of a Service.
pub fn service_started(service: &Service) {
if initialized() {
publish(SERVICE_STARTED_SUBJECT,
publish(&SERVICE_STARTED_SUBJECT,
ServiceStartedEvent { service_metadata: Some(service.to_service_metadata()),
event_metadata: None, });
}
Expand All @@ -101,7 +110,7 @@ pub fn service_started(service: &Service) {
/// Send an event for the stop of a Service.
pub fn service_stopped(service: &Service) {
if initialized() {
publish(SERVICE_STOPPED_SUBJECT,
publish(&SERVICE_STOPPED_SUBJECT,
ServiceStoppedEvent { service_metadata: Some(service.to_service_metadata()),
event_metadata: None, });
}
Expand All @@ -110,7 +119,7 @@ pub fn service_stopped(service: &Service) {
/// Send an event at the start of a Service update.
pub fn service_update_started(service: &Service, update: &PackageIdent) {
if initialized() {
publish(SERVICE_UPDATE_STARTED_SUBJECT,
publish(&SERVICE_UPDATE_STARTED_SUBJECT,
ServiceUpdateStartedEvent { event_metadata: None,
service_metadata:
Some(service.to_service_metadata()),
Expand All @@ -137,7 +146,7 @@ pub fn health_check(metadata: ServiceMetadata,

let prost_interval = ProstDuration::from(Duration::from(health_check_interval));

publish(HEALTHCHECK_SUBJECT,
publish(&HEALTHCHECK_SUBJECT,
HealthCheckEvent { service_metadata: Some(metadata),
event_metadata: None,
result: i32::from(health_check_result),
Expand Down Expand Up @@ -198,7 +207,7 @@ fn initialized() -> bool { NATS_MESSAGE_STREAM.try_get().is_some() }
///
/// If `init_stream` has not been called already, this function will
/// be a no-op.
fn publish(subject: &'static str, mut event: impl EventMessage) {
fn publish(subject: &'static Subject, mut event: impl EventMessage) {
if let Some(stream) = NATS_MESSAGE_STREAM.try_get() {
// TODO (CM): Yeah... this is looking pretty gross. The
// intention is to be able to timestamp the events right as
Expand Down
28 changes: 12 additions & 16 deletions components/sup/src/event/error.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
//! Event subsystem-specific error handling

use native_tls;
use rants::{error::Error as RantsError,
native_tls};
use std::{error,
fmt,
io,
result};
use tokio::time::error::Elapsed;

pub type Result<T> = result::Result<T, Error>;

#[derive(Debug)]
pub enum Error {
NotConnected,
PublishFailed(io::Error),
ConnectTimeout(Elapsed),
ConnectNatsServer,
HabitatCore(habitat_core::Error),
NativeTls(native_tls::Error),
Rants(RantsError),
}

// TODO (CM): I would have like to have derived Fail on our Error
Expand All @@ -29,35 +27,33 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::NotConnected => "Could not establish connection to NATS server".fmt(f),
Error::PublishFailed(e) => format!("{}", e).fmt(f),
Error::ConnectTimeout(e) => format!("{}", e).fmt(f),
Error::ConnectNatsServer => "Could not establish connection to NATS server".fmt(f),
Error::HabitatCore(_) => "{}".fmt(f),
Error::NativeTls(e) => format!("{}", e).fmt(f),
Error::Rants(e) => format!("{}", e).fmt(f),
}
}
}

impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
Error::NotConnected => None,
Error::PublishFailed(ref e) => Some(e),
Error::ConnectTimeout(ref e) => Some(e),
Error::ConnectNatsServer => None,
Error::HabitatCore(ref e) => Some(e),
Error::Rants(ref e) => Some(e),
Error::NativeTls(ref e) => Some(e),
}
}
}

impl From<Elapsed> for Error {
fn from(error: Elapsed) -> Self { Error::ConnectTimeout(error) }
}

impl From<habitat_core::Error> for Error {
fn from(error: habitat_core::Error) -> Self { Error::HabitatCore(error) }
}

impl From<RantsError> for Error {
fn from(error: RantsError) -> Self { Error::Rants(error) }
}

impl From<native_tls::Error> for Error {
fn from(error: native_tls::Error) -> Self { Error::NativeTls(error) }
}
Loading

0 comments on commit 594e1c8

Please sign in to comment.