From 4935d8085447af51bec2b4f255e757b1ee35f5a7 Mon Sep 17 00:00:00 2001 From: Bastian Krol Date: Thu, 1 Feb 2024 08:35:07 +0100 Subject: [PATCH] [shippingservice] add support for simulated slowness --- docker-compose.yml | 1 + src/ffspostgres/init-scripts/20-ffs_data.sql | 5 +- src/shippingservice/Cargo.lock | 2 + src/shippingservice/Cargo.toml | 2 + src/shippingservice/src/main.rs | 37 +++++++++++-- src/shippingservice/src/shipping_service.rs | 57 ++++++++++++++++++-- 6 files changed, 97 insertions(+), 7 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 43d482625c..26bba60d93 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -530,6 +530,7 @@ services: environment: - SHIPPING_SERVICE_PORT - QUOTE_SERVICE_ADDR + - FEATURE_FLAG_GRPC_SERVICE_ADDR - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:4317/v1/traces - OTEL_RESOURCE_ATTRIBUTES - OTEL_SERVICE_NAME=shippingservice diff --git a/src/ffspostgres/init-scripts/20-ffs_data.sql b/src/ffspostgres/init-scripts/20-ffs_data.sql index b9bda952ad..56a6ecccc8 100644 --- a/src/ffspostgres/init-scripts/20-ffs_data.sql +++ b/src/ffspostgres/init-scripts/20-ffs_data.sql @@ -10,4 +10,7 @@ VALUES ('cartServiceFailure', 'Fail cart service requests', 0), ('paymentServiceSimulateSlowness', 'Simulate slow response times in the payment service', 0), ('paymentServiceSimulateSlownessLowerBound', 'Minimum simulated delay in milliseconds in payment service, if enabled', 200), - ('paymentServiceSimulateSlownessUpperBound', 'Maximum simulated delay in milliseconds in payment service, if enabled', 600); + ('paymentServiceSimulateSlownessUpperBound', 'Maximum simulated delay in milliseconds in payment service, if enabled', 600), + ('shippingServiceSimulateSlowness', 'Simulate slow response times in the shipping service', 0), + ('shippingServiceSimulateSlownessLowerBound', 'Minimum simulated delay in milliseconds in shipping service, if enabled', 250), + ('shippingServiceSimulateSlownessUpperBound', 'Maximum simulated delay in milliseconds in shipping service, if enabled', 400); diff --git a/src/shippingservice/Cargo.lock b/src/shippingservice/Cargo.lock index 2dcf1d1c50..e621d372ce 100644 --- a/src/shippingservice/Cargo.lock +++ b/src/shippingservice/Cargo.lock @@ -1176,10 +1176,12 @@ dependencies = [ "opentelemetry_sdk 0.21.1", "prost 0.12.1", "prost-types", + "rand", "reqwest", "reqwest-middleware", "reqwest-tracing", "simplelog", + "time", "tokio", "tonic 0.10.2", "tonic-build", diff --git a/src/shippingservice/Cargo.toml b/src/shippingservice/Cargo.toml index c60a2b393a..ff52a05ae9 100644 --- a/src/shippingservice/Cargo.toml +++ b/src/shippingservice/Cargo.toml @@ -31,6 +31,8 @@ reqwest-tracing = { version = "0.4.6", features = ["opentelemetry_0_20"] } tracing = { version = "0.1", features = ["max_level_debug", "release_max_level_info"] } tracing-opentelemetry = "0.22.0" tracing-subscriber = "0.3" +time = "0.3.17" +rand = "0.8.5" [dependencies.uuid] version = "1.5.0" diff --git a/src/shippingservice/src/main.rs b/src/shippingservice/src/main.rs index 79ef0dae3f..5970e590d9 100644 --- a/src/shippingservice/src/main.rs +++ b/src/shippingservice/src/main.rs @@ -10,7 +10,7 @@ use opentelemetry_sdk::{propagation::TraceContextPropagator, resource::{ }, runtime, trace as sdktrace}; use opentelemetry_otlp::{self, WithExportConfig}; -use tonic::transport::Server; +use tonic::transport::{Channel, Server}; use tracing_subscriber::Registry; use tracing_subscriber::layer::SubscriberExt; @@ -22,8 +22,15 @@ use std::env; use std::time::Duration; mod shipping_service; + use shipping_service::shop::shipping_service_server::ShippingServiceServer; use shipping_service::ShippingServer; +use shop::feature_flag_service_client::FeatureFlagServiceClient; + +pub mod shop { + // The string specified here must match the proto package name + tonic::include_proto!("oteldemo"); +} fn init_logger() -> Result<(), log::SetLoggerError> { CombinedLogger::init(vec![ @@ -67,6 +74,28 @@ fn init_reqwest_tracing(tracer: sdktrace::Tracer) -> Result<(), tracing::subscri tracing::subscriber::set_global_default(subscriber) } +async fn init_feature_flag_client() -> Option> { + let ffs_addr_env = env::var("FEATURE_FLAG_GRPC_SERVICE_ADDR"); + if ffs_addr_env.is_ok() { + let addr = ffs_addr_env.unwrap(); + let addr_with_scheme = "http://".to_owned() + addr.as_str(); + info!("Trying to connect to feature flag service at: {}", addr_with_scheme); + let result = Channel::from_shared(addr_with_scheme.clone()); + if result.is_ok() { + let ffs_channel = result.ok()?.connect().await; + if ffs_channel.is_ok() { + let ffc = FeatureFlagServiceClient::new(ffs_channel.ok()?); + info!("Connected to feature flag service at: {}", addr_with_scheme); + return Some(ffc); + } + warn!("Could not connect to feature flag service at: {}, simulated slowness will not be enabled.", addr_with_scheme); + } + } else { + warn!("FEATURE_FLAG_GRPC_SERVICE_ADDR is not set, simulated slowness will not be enabled."); + } + return None +} + #[tokio::main] async fn main() -> Result<(), Box> { let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); @@ -76,11 +105,13 @@ async fn main() -> Result<(), Box> { init_logger()?; init_reqwest_tracing(init_tracer()?)?; + let feature_flag_client = init_feature_flag_client().await; + info!("OTel pipeline created"); let port = env::var("SHIPPING_SERVICE_PORT").expect("$SHIPPING_SERVICE_PORT is not set"); let addr = format!("0.0.0.0:{}", port).parse()?; info!("listening on {}", addr); - let shipper = ShippingServer::default(); + let shipper = ShippingServer::new(feature_flag_client); Server::builder() .add_service(ShippingServiceServer::new(shipper)) @@ -89,4 +120,4 @@ async fn main() -> Result<(), Box> { .await?; Ok(()) -} +} \ No newline at end of file diff --git a/src/shippingservice/src/shipping_service.rs b/src/shippingservice/src/shipping_service.rs index 20b72f94ce..03a89d8446 100644 --- a/src/shippingservice/src/shipping_service.rs +++ b/src/shippingservice/src/shipping_service.rs @@ -1,14 +1,21 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +use std::{thread, time}; +use rand::Rng; + use opentelemetry::{global, propagation::Extractor, trace::Span, Context, KeyValue}; use opentelemetry::trace::{FutureExt, TraceContextExt, SpanKind, Tracer}; use opentelemetry_semantic_conventions as semcov; use shop::shipping_service_server::ShippingService; -use shop::{GetQuoteRequest, GetQuoteResponse, Money, ShipOrderRequest, ShipOrderResponse}; use tonic::{Request, Response, Status}; use log::*; +use tonic::transport::Channel; + +use shop::{GetQuoteRequest, GetQuoteResponse, Money, ShipOrderRequest, ShipOrderResponse}; +use crate::shop::feature_flag_service_client::FeatureFlagServiceClient; +use crate::shop::{RangeFeatureFlagRequest, RangeFeatureFlagResponse}; mod quote; use quote::create_quote_from_count; @@ -23,11 +30,14 @@ const RPC_GRPC_STATUS_CODE_OK: i64 = 0; const RPC_GRPC_STATUS_CODE_UNKNOWN: i64 = 2; pub mod shop { - tonic::include_proto!("oteldemo"); // The string specified here must match the proto package name + // The string specified here must match the proto package name + tonic::include_proto!("oteldemo"); } #[derive(Debug, Default)] -pub struct ShippingServer {} +pub struct ShippingServer { + feature_flag_client: Option>, +} struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); @@ -85,6 +95,8 @@ impl ShippingService for ShippingServer { Err(status) => {cx.span().set_attribute(semcov::trace::RPC_GRPC_STATUS_CODE.i64(RPC_GRPC_STATUS_CODE_UNKNOWN)); return Err(status)}, }; + self.simulate_slowness().await; + let reply = GetQuoteResponse { cost_usd: Some(Money { currency_code: "USD".into(), @@ -114,6 +126,8 @@ impl ShippingService for ShippingServer { span.add_event("Processing shipping order request".to_string(), vec![]); + self.simulate_slowness().await; + let tid = create_tracking_id(); span.set_attribute(KeyValue::new("app.shipping.tracking.id", tid.clone())); info!("Tracking ID Created: {}", tid); @@ -128,6 +142,42 @@ impl ShippingService for ShippingServer { } } +impl ShippingServer { + + pub fn new(feature_flag_client: Option>) -> Self { + Self { feature_flag_client } + } + + async fn simulate_slowness(&self) { + if self.feature_flag_client.is_none() { + return; + } + + let mut client = self.feature_flag_client.clone().unwrap(); + + let request = Request::new( + RangeFeatureFlagRequest { + name: String::from("shippingServiceSimulateSlowness"), + name_lower_bound: String::from("shippingServiceSimulateSlownessLowerBound"), + name_upper_bound: String::from("shippingServiceSimulateSlownessUpperBound"), + }, + ); + + + let response: Response = + client.get_range_feature_flag(request).await.unwrap(); + let range_response: RangeFeatureFlagResponse = response.into_inner(); + if range_response.enabled { + let minimum_delay = range_response.lower_bound.floor(); + let maximum_delay = range_response.upper_bound.floor(); + let delay_millis = rand::thread_rng().gen_range(minimum_delay..maximum_delay).floor(); + info!("Simulating shipping service slowness, waiting {}.", delay_millis); + let sleep_time = time::Duration::from_millis(delay_millis as u64); + thread::sleep(sleep_time); + } + } +} + #[cfg(test)] mod tests { use super::{ @@ -157,6 +207,7 @@ mod tests { fn make_empty_quote_request() -> Request { Request::new(GetQuoteRequest::default()) } + #[tokio::test] async fn empty_quote() { let server = ShippingServer::default();