Skip to content

Commit

Permalink
[shippingservice] add support for simulated slowness
Browse files Browse the repository at this point in the history
  • Loading branch information
basti1302 committed Feb 1, 2024
1 parent 36a0681 commit 4935d80
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 7 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ services:
environment:
- SHIPPING_SERVICE_PORT
- QUOTE_SERVICE_ADDR
- FEATURE_FLAG_GRPC_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:4317/v1/traces
- OTEL_RESOURCE_ATTRIBUTES
- OTEL_SERVICE_NAME=shippingservice
Expand Down
5 changes: 4 additions & 1 deletion src/ffspostgres/init-scripts/20-ffs_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ VALUES
('cartServiceFailure', 'Fail cart service requests', 0),
('paymentServiceSimulateSlowness', 'Simulate slow response times in the payment service', 0),
('paymentServiceSimulateSlownessLowerBound', 'Minimum simulated delay in milliseconds in payment service, if enabled', 200),
('paymentServiceSimulateSlownessUpperBound', 'Maximum simulated delay in milliseconds in payment service, if enabled', 600);
('paymentServiceSimulateSlownessUpperBound', 'Maximum simulated delay in milliseconds in payment service, if enabled', 600),
('shippingServiceSimulateSlowness', 'Simulate slow response times in the shipping service', 0),
('shippingServiceSimulateSlownessLowerBound', 'Minimum simulated delay in milliseconds in shipping service, if enabled', 250),
('shippingServiceSimulateSlownessUpperBound', 'Maximum simulated delay in milliseconds in shipping service, if enabled', 400);
2 changes: 2 additions & 0 deletions src/shippingservice/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/shippingservice/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ reqwest-tracing = { version = "0.4.6", features = ["opentelemetry_0_20"] }
tracing = { version = "0.1", features = ["max_level_debug", "release_max_level_info"] }
tracing-opentelemetry = "0.22.0"
tracing-subscriber = "0.3"
time = "0.3.17"
rand = "0.8.5"

[dependencies.uuid]
version = "1.5.0"
Expand Down
37 changes: 34 additions & 3 deletions src/shippingservice/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use opentelemetry_sdk::{propagation::TraceContextPropagator, resource::{
}, runtime, trace as sdktrace};
use opentelemetry_otlp::{self, WithExportConfig};

use tonic::transport::Server;
use tonic::transport::{Channel, Server};

use tracing_subscriber::Registry;
use tracing_subscriber::layer::SubscriberExt;
Expand All @@ -22,8 +22,15 @@ use std::env;
use std::time::Duration;

mod shipping_service;

use shipping_service::shop::shipping_service_server::ShippingServiceServer;
use shipping_service::ShippingServer;
use shop::feature_flag_service_client::FeatureFlagServiceClient;

pub mod shop {
// The string specified here must match the proto package name
tonic::include_proto!("oteldemo");
}

fn init_logger() -> Result<(), log::SetLoggerError> {
CombinedLogger::init(vec![
Expand Down Expand Up @@ -67,6 +74,28 @@ fn init_reqwest_tracing(tracer: sdktrace::Tracer) -> Result<(), tracing::subscri
tracing::subscriber::set_global_default(subscriber)
}

async fn init_feature_flag_client() -> Option<FeatureFlagServiceClient<Channel>> {
let ffs_addr_env = env::var("FEATURE_FLAG_GRPC_SERVICE_ADDR");
if ffs_addr_env.is_ok() {
let addr = ffs_addr_env.unwrap();
let addr_with_scheme = "http://".to_owned() + addr.as_str();
info!("Trying to connect to feature flag service at: {}", addr_with_scheme);
let result = Channel::from_shared(addr_with_scheme.clone());
if result.is_ok() {
let ffs_channel = result.ok()?.connect().await;
if ffs_channel.is_ok() {
let ffc = FeatureFlagServiceClient::new(ffs_channel.ok()?);
info!("Connected to feature flag service at: {}", addr_with_scheme);
return Some(ffc);
}
warn!("Could not connect to feature flag service at: {}, simulated slowness will not be enabled.", addr_with_scheme);
}
} else {
warn!("FEATURE_FLAG_GRPC_SERVICE_ADDR is not set, simulated slowness will not be enabled.");
}
return None
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
Expand All @@ -76,11 +105,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

init_logger()?;
init_reqwest_tracing(init_tracer()?)?;
let feature_flag_client = init_feature_flag_client().await;

info!("OTel pipeline created");
let port = env::var("SHIPPING_SERVICE_PORT").expect("$SHIPPING_SERVICE_PORT is not set");
let addr = format!("0.0.0.0:{}", port).parse()?;
info!("listening on {}", addr);
let shipper = ShippingServer::default();
let shipper = ShippingServer::new(feature_flag_client);

Server::builder()
.add_service(ShippingServiceServer::new(shipper))
Expand All @@ -89,4 +120,4 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

Ok(())
}
}
57 changes: 54 additions & 3 deletions src/shippingservice/src/shipping_service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

use std::{thread, time};
use rand::Rng;

use opentelemetry::{global, propagation::Extractor, trace::Span, Context, KeyValue};
use opentelemetry::trace::{FutureExt, TraceContextExt, SpanKind, Tracer};
use opentelemetry_semantic_conventions as semcov;
use shop::shipping_service_server::ShippingService;
use shop::{GetQuoteRequest, GetQuoteResponse, Money, ShipOrderRequest, ShipOrderResponse};
use tonic::{Request, Response, Status};

use log::*;
use tonic::transport::Channel;

use shop::{GetQuoteRequest, GetQuoteResponse, Money, ShipOrderRequest, ShipOrderResponse};
use crate::shop::feature_flag_service_client::FeatureFlagServiceClient;
use crate::shop::{RangeFeatureFlagRequest, RangeFeatureFlagResponse};

mod quote;
use quote::create_quote_from_count;
Expand All @@ -23,11 +30,14 @@ const RPC_GRPC_STATUS_CODE_OK: i64 = 0;
const RPC_GRPC_STATUS_CODE_UNKNOWN: i64 = 2;

pub mod shop {
tonic::include_proto!("oteldemo"); // The string specified here must match the proto package name
// The string specified here must match the proto package name
tonic::include_proto!("oteldemo");
}

#[derive(Debug, Default)]
pub struct ShippingServer {}
pub struct ShippingServer {
feature_flag_client: Option<FeatureFlagServiceClient<Channel>>,
}

struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);

Expand Down Expand Up @@ -85,6 +95,8 @@ impl ShippingService for ShippingServer {
Err(status) => {cx.span().set_attribute(semcov::trace::RPC_GRPC_STATUS_CODE.i64(RPC_GRPC_STATUS_CODE_UNKNOWN)); return Err(status)},
};

self.simulate_slowness().await;

let reply = GetQuoteResponse {
cost_usd: Some(Money {
currency_code: "USD".into(),
Expand Down Expand Up @@ -114,6 +126,8 @@ impl ShippingService for ShippingServer {

span.add_event("Processing shipping order request".to_string(), vec![]);

self.simulate_slowness().await;

let tid = create_tracking_id();
span.set_attribute(KeyValue::new("app.shipping.tracking.id", tid.clone()));
info!("Tracking ID Created: {}", tid);
Expand All @@ -128,6 +142,42 @@ impl ShippingService for ShippingServer {
}
}

impl ShippingServer {

pub fn new(feature_flag_client: Option<FeatureFlagServiceClient<Channel>>) -> Self {
Self { feature_flag_client }
}

async fn simulate_slowness(&self) {
if self.feature_flag_client.is_none() {
return;
}

let mut client = self.feature_flag_client.clone().unwrap();

let request = Request::new(
RangeFeatureFlagRequest {
name: String::from("shippingServiceSimulateSlowness"),
name_lower_bound: String::from("shippingServiceSimulateSlownessLowerBound"),
name_upper_bound: String::from("shippingServiceSimulateSlownessUpperBound"),
},
);


let response: Response<RangeFeatureFlagResponse> =
client.get_range_feature_flag(request).await.unwrap();
let range_response: RangeFeatureFlagResponse = response.into_inner();
if range_response.enabled {
let minimum_delay = range_response.lower_bound.floor();
let maximum_delay = range_response.upper_bound.floor();
let delay_millis = rand::thread_rng().gen_range(minimum_delay..maximum_delay).floor();
info!("Simulating shipping service slowness, waiting {}.", delay_millis);
let sleep_time = time::Duration::from_millis(delay_millis as u64);
thread::sleep(sleep_time);
}
}
}

#[cfg(test)]
mod tests {
use super::{
Expand Down Expand Up @@ -157,6 +207,7 @@ mod tests {
fn make_empty_quote_request() -> Request<GetQuoteRequest> {
Request::new(GetQuoteRequest::default())
}

#[tokio::test]
async fn empty_quote() {
let server = ShippingServer::default();
Expand Down

0 comments on commit 4935d80

Please sign in to comment.