diff --git a/opentelemetry-zipkin/CHANGELOG.md b/opentelemetry-zipkin/CHANGELOG.md index 2c92453ce2..3787035b15 100644 --- a/opentelemetry-zipkin/CHANGELOG.md +++ b/opentelemetry-zipkin/CHANGELOG.md @@ -3,6 +3,26 @@ ## vNext - Bump msrv to 1.75.0. +- **Breaking** The `opentelemetry_zipkin::new_pipeline()` interface is now replaced with `opentelemetry_zipkin::ZipkinExporter::builder()`. + + Previous Signature: + ```rust + let tracer = opentelemetry_zipkin::new_pipeline() + .with_service_name("trace-demo") + .install_simple()?; + ``` + Updated Signature: + ```rust + let exporter = ZipkinExporter::builder() + .with_service_name("trace-demo") + .build()?; + let provider = TracerProvider::builder() + .with_simple_exporter(exporter) + .build(); + global::set_tracer_provider(provider.clone()); + + let tracer = global::tracer("zipkin-tracer"); + ``` ## 0.27.0 diff --git a/opentelemetry-zipkin/Cargo.toml b/opentelemetry-zipkin/Cargo.toml index dba2fbc480..75f7029bc8 100644 --- a/opentelemetry-zipkin/Cargo.toml +++ b/opentelemetry-zipkin/Cargo.toml @@ -26,7 +26,6 @@ reqwest-client = ["reqwest", "opentelemetry-http/reqwest"] reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"] [dependencies] -async-trait = { workspace = true } once_cell = { workspace = true } opentelemetry = { version = "0.27", path = "../opentelemetry" } opentelemetry_sdk = { version = "0.27", path = "../opentelemetry-sdk", features = ["trace"] } @@ -41,6 +40,7 @@ thiserror = { workspace = true } futures-core = { workspace = true } [dev-dependencies] +async-trait = { workspace = true } bytes = { workspace = true } futures-util = { workspace = true, features = ["io"] } http-body-util = { workspace = true } diff --git a/opentelemetry-zipkin/examples/zipkin.rs b/opentelemetry-zipkin/examples/zipkin.rs index 09b755f3fa..0b29f828ca 100644 --- a/opentelemetry-zipkin/examples/zipkin.rs +++ b/opentelemetry-zipkin/examples/zipkin.rs @@ -1,7 +1,10 @@ use opentelemetry::{ global::{self}, - trace::{Span, Tracer}, + trace::{Span, TraceError, Tracer}, + InstrumentationScope, KeyValue, }; +use opentelemetry_sdk::trace::TracerProvider; +use opentelemetry_zipkin::ZipkinExporter; use std::thread; use std::time::Duration; @@ -12,10 +15,26 @@ fn bar() { span.end() } -fn main() -> Result<(), Box> { - let (tracer, provider) = opentelemetry_zipkin::new_pipeline() +fn init_traces() -> Result { + let exporter = ZipkinExporter::builder() .with_service_name("trace-demo") - .install_simple()?; + .build()?; + + Ok(TracerProvider::builder() + .with_simple_exporter(exporter) + .build()) +} + +fn main() -> Result<(), Box> { + let provider = init_traces()?; + global::set_tracer_provider(provider.clone()); + + let common_scope_attributes = vec![KeyValue::new("scope-key", "scope-value")]; + let scope = InstrumentationScope::builder("opentelemetry-zipkin") + .with_version(env!("CARGO_PKG_VERSION")) + .with_attributes(common_scope_attributes) + .build(); + let tracer = global::tracer_with_scope(scope.clone()); tracer.in_span("foo", |_cx| { thread::sleep(Duration::from_millis(6)); diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index dbfd549076..f7e4a29489 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -2,59 +2,54 @@ mod env; mod model; mod uploader; -use async_trait::async_trait; use futures_core::future::BoxFuture; use http::Uri; use model::endpoint::Endpoint; -use opentelemetry::{global, trace::TraceError, InstrumentationScope, KeyValue}; +use opentelemetry::trace::TraceError; use opentelemetry_http::HttpClient; use opentelemetry_sdk::{ resource::{ResourceDetector, SdkProvidedResourceDetector}, - trace, - trace::{Config, Tracer, TracerProvider}, - ExportError, Resource, + trace, ExportError, }; use opentelemetry_semantic_conventions as semcov; -use std::borrow::Cow; -use std::net::SocketAddr; +use std::net::{AddrParseError, SocketAddr}; use std::sync::Arc; /// Zipkin span exporter #[derive(Debug)] -pub struct Exporter { +pub struct ZipkinExporter { local_endpoint: Endpoint, uploader: uploader::Uploader, } -impl Exporter { +impl ZipkinExporter { + /// Get a builder to configure a [ZipkinExporter]. + pub fn builder() -> ZipkinExporterBuilder { + ZipkinExporterBuilder::default() + } + fn new(local_endpoint: Endpoint, client: Arc, collector_endpoint: Uri) -> Self { - Exporter { + ZipkinExporter { local_endpoint, uploader: uploader::Uploader::new(client, collector_endpoint), } } } -/// Create a new Zipkin exporter pipeline builder. -pub fn new_pipeline() -> ZipkinPipelineBuilder { - ZipkinPipelineBuilder::default() -} - -/// Builder for `ExporterConfig` struct. +/// Builder for `ZipkinExporter` struct. #[derive(Debug)] -pub struct ZipkinPipelineBuilder { +pub struct ZipkinExporterBuilder { service_name: Option, service_addr: Option, collector_endpoint: String, - trace_config: Option, client: Option>, } -impl Default for ZipkinPipelineBuilder { +impl Default for ZipkinExporterBuilder { fn default() -> Self { let timeout = env::get_timeout(); - ZipkinPipelineBuilder { + ZipkinExporterBuilder { #[cfg(feature = "reqwest-blocking-client")] client: Some(Arc::new( reqwest::blocking::Client::builder() @@ -78,58 +73,29 @@ impl Default for ZipkinPipelineBuilder { service_name: None, service_addr: None, collector_endpoint: env::get_endpoint(), - trace_config: None, } } } -impl ZipkinPipelineBuilder { - /// Initial a Zipkin span exporter. +impl ZipkinExporterBuilder { + /// Creates a new [ZipkinExporter] from this configuration. /// /// Returns error if the endpoint is not valid or if no http client is provided. - pub fn init_exporter(mut self) -> Result { - let (_, endpoint) = self.init_config_and_endpoint(); - self.init_exporter_with_endpoint(endpoint) - } - - fn init_config_and_endpoint(&mut self) -> (Config, Endpoint) { - let service_name = self.service_name.take(); - if let Some(service_name) = service_name { - let config = if let Some(mut cfg) = self.trace_config.take() { - cfg.resource = Cow::Owned( - Resource::builder_empty() - .with_attributes( - cfg.resource - .iter() - .filter(|(k, _v)| k.as_str() != semcov::resource::SERVICE_NAME) - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect::>(), - ) - .build(), - ); - cfg - } else { - #[allow(deprecated)] - Config::default().with_resource(Resource::builder_empty().build()) - }; - (config, Endpoint::new(service_name, self.service_addr)) + pub fn build(self) -> Result { + let service_name = if let Some(service_name) = self.service_name { + service_name } else { - let service_name = SdkProvidedResourceDetector + SdkProvidedResourceDetector .detect() .get(&semcov::resource::SERVICE_NAME.into()) .unwrap() - .to_string(); - ( - #[allow(deprecated)] - Config::default().with_resource(Resource::builder_empty().build()), - Endpoint::new(service_name, self.service_addr), - ) - } - } + .to_string() + }; + + let endpoint = Endpoint::new(service_name, self.service_addr); - fn init_exporter_with_endpoint(self, endpoint: Endpoint) -> Result { if let Some(client) = self.client { - let exporter = Exporter::new( + let exporter = ZipkinExporter::new( endpoint, client, self.collector_endpoint @@ -142,45 +108,6 @@ impl ZipkinPipelineBuilder { } } - /// Install the Zipkin trace exporter pipeline with a simple span processor. - #[allow(deprecated)] - pub fn install_simple( - mut self, - ) -> Result<(Tracer, opentelemetry_sdk::trace::TracerProvider), TraceError> { - let (config, endpoint) = self.init_config_and_endpoint(); - let exporter = self.init_exporter_with_endpoint(endpoint)?; - let mut provider_builder = TracerProvider::builder().with_simple_exporter(exporter); - provider_builder = provider_builder.with_config(config); - let provider = provider_builder.build(); - let scope = InstrumentationScope::builder("opentelemetry-zipkin") - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(semcov::SCHEMA_URL) - .build(); - let tracer = opentelemetry::trace::TracerProvider::tracer_with_scope(&provider, scope); - let _ = global::set_tracer_provider(provider.clone()); - Ok((tracer, provider)) - } - - /// Install the Zipkin trace exporter pipeline with a batch span processor using the specified - /// runtime. - #[allow(deprecated)] - pub fn install_batch( - mut self, - ) -> Result<(Tracer, opentelemetry_sdk::trace::TracerProvider), TraceError> { - let (config, endpoint) = self.init_config_and_endpoint(); - let exporter = self.init_exporter_with_endpoint(endpoint)?; - let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter); - provider_builder = provider_builder.with_config(config); - let provider = provider_builder.build(); - let scope = InstrumentationScope::builder("opentelemetry-zipkin") - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(semcov::SCHEMA_URL) - .build(); - let tracer = opentelemetry::trace::TracerProvider::tracer_with_scope(&provider, scope); - let _ = global::set_tracer_provider(provider.clone()); - Ok((tracer, provider)) - } - /// Assign the service name under which to group traces. pub fn with_service_name>(mut self, name: T) -> Self { self.service_name = Some(name.into()); @@ -193,7 +120,7 @@ impl ZipkinPipelineBuilder { self } - /// Assign the service name under which to group traces. + /// Assign the service address. pub fn with_service_address(mut self, addr: SocketAddr) -> Self { self.service_addr = Some(addr); self @@ -204,12 +131,6 @@ impl ZipkinPipelineBuilder { self.collector_endpoint = endpoint.into(); self } - - /// Assign the SDK trace configuration. - pub fn with_trace_config(mut self, config: Config) -> Self { - self.trace_config = Some(config); - self - } } async fn zipkin_export( @@ -225,8 +146,7 @@ async fn zipkin_export( uploader.upload(zipkin_spans).await } -#[async_trait] -impl trace::SpanExporter for Exporter { +impl trace::SpanExporter for ZipkinExporter { /// Export spans to Zipkin collector. fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { Box::pin(zipkin_export( @@ -253,6 +173,10 @@ pub enum Error { #[error("invalid uri")] InvalidUri(#[from] http::uri::InvalidUri), + /// The IP/socket address provided is invalid + #[error("invalid address")] + InvalidAddress(#[from] AddrParseError), + /// Other errors #[error("export error: {0}")] Other(String), diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index 1bc9d3ecc9..f19cc09659 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -21,12 +21,23 @@ //! telemetry: //! //! ```no_run -//! use opentelemetry::trace::{Tracer, TraceError}; //! use opentelemetry::global; +//! use opentelemetry::trace::{Tracer, TraceError}; +//! use opentelemetry_sdk::trace::TracerProvider; +//! use opentelemetry_zipkin::ZipkinExporter; //! //! fn main() -> Result<(), TraceError> { //! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); -//! let (tracer, provider) = opentelemetry_zipkin::new_pipeline().install_simple()?; +//! +//! let exporter = ZipkinExporter::builder() +//! .with_service_name("trace-demo") +//! .build()?; +//! let provider = TracerProvider::builder() +//! .with_simple_exporter(exporter) +//! .build(); +//! global::set_tracer_provider(provider.clone()); +//! +//! let tracer = global::tracer("zipkin-tracer"); //! //! tracer.in_span("doing_work", |cx| { //! // Traced app logic here... @@ -41,27 +52,35 @@ //! ## Performance //! //! For optimal performance, a batch exporter is recommended as the simple exporter -//! will export each span synchronously on drop. You can enable the [`rt-tokio`], -//! [`rt-tokio-current-thread`] or [`rt-async-std`] features and specify a runtime -//! on the pipeline builder to have a batch exporter configured for you -//! automatically. -//! -//! ```toml -//! [dependencies] -//! opentelemetry = { version = "*", features = ["rt-tokio"] } -//! opentelemetry-zipkin = { version = "*", features = ["reqwest-client"], default-features = false } -//! ``` +//! will export each span synchronously on drop. You can achieve this by creating a +//! `BatchSpanProcessor` and passing it to the trace provider. //! //! ```no_run -//! # fn main() -> Result<(), opentelemetry::trace::TraceError> { -//! let tracer = opentelemetry_zipkin::new_pipeline() -//! .install_batch(opentelemetry_sdk::runtime::Tokio)?; -//! # Ok(()) -//! # } -//! ``` +//! use opentelemetry_sdk::{ +//! trace::{ +//! BatchSpanProcessor, +//! BatchConfigBuilder, +//! TracerProvider, +//! } +//! }; +//! use opentelemetry_zipkin::ZipkinExporter; +//! +//! fn main() -> Result<(), opentelemetry::trace::TraceError> { +//! let exporter = ZipkinExporter::builder() +//! .with_service_name("runtime-demo") +//! .build()?; +//! +//! let batch = BatchSpanProcessor::builder(exporter) +//! .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build()) +//! .build(); +//! +//! let provider = TracerProvider::builder() +//! .with_span_processor(batch) +//! .build(); //! -//! [`rt-tokio`]: https://tokio.rs -//! [`async-std`]: https://async.rs +//! Ok(()) +//! } +//! ``` //! //! ## Choosing an HTTP client //! @@ -81,14 +100,14 @@ //! ## Kitchen Sink Full Configuration //! //! Example showing how to override all configuration options. See the -//! [`ZipkinPipelineBuilder`] docs for details of each option. +//! [`ZipkinExporterBuilder`] docs for details of each option. //! //! //! ```no_run -//! use opentelemetry::{global, KeyValue, trace::Tracer}; -//! use opentelemetry_sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource}; -//! use opentelemetry_sdk::trace::ExportResult; +//! use opentelemetry::{global, InstrumentationScope, KeyValue, trace::{Tracer, TraceError}}; +//! use opentelemetry_sdk::{trace::{self, ExportResult, RandomIdGenerator, Sampler}, Resource}; //! use opentelemetry_http::{HttpClient, HttpError}; +//! use opentelemetry_zipkin::{Error as ZipkinError, ZipkinExporter}; //! use async_trait::async_trait; //! use bytes::Bytes; //! use futures_util::io::AsyncReadExt as _; @@ -129,9 +148,8 @@ //! } //! } //! -//! fn main() -> Result<(), Box> { -//! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); -//! let (tracer, provider) = opentelemetry_zipkin::new_pipeline() +//! fn init_traces() -> Result { +//! let exporter = ZipkinExporter::builder() //! .with_http_client( //! HyperClient( //! Client::builder(TokioExecutor::new()) @@ -139,18 +157,40 @@ //! ) //! ) //! .with_service_name("my_app") -//! .with_service_address("127.0.0.1:8080".parse()?) +//! .with_service_address( +//! "127.0.0.1:8080" +//! .parse() +//! .map_err::(Into::into)? +//! ) //! .with_collector_endpoint("http://localhost:9411/api/v2/spans") -//! .with_trace_config( -//! trace::config() -//! .with_sampler(Sampler::AlwaysOn) -//! .with_id_generator(RandomIdGenerator::default()) -//! .with_max_events_per_span(64) -//! .with_max_attributes_per_span(16) -//! .with_max_events_per_span(16) -//! .with_resource(Resource::builder_empty().with_attribute(KeyValue::new("key", "value")).build()), +//! .build()?; +//! +//! Ok(trace::TracerProvider::builder() +//! .with_sampler(Sampler::AlwaysOn) +//! .with_batch_exporter(exporter) +//! .with_id_generator(RandomIdGenerator::default()) +//! .with_max_events_per_span(64) +//! .with_max_attributes_per_span(16) +//! .with_max_events_per_span(16) +//! .with_resource( +//! Resource::builder_empty() +//! .with_attribute(KeyValue::new("key", "value")) +//! .build() //! ) -//! .install_batch(opentelemetry_sdk::runtime::Tokio)?; +//! .build()) +//! } +//! +//! fn main() -> Result<(), Box> { +//! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); +//! let provider = init_traces()?; +//! global::set_tracer_provider(provider.clone()); +//! +//! let common_scope_attributes = vec![KeyValue::new("scope-key", "scope-value")]; +//! let scope = InstrumentationScope::builder("opentelemetry-zipkin") +//! .with_version(env!("CARGO_PKG_VERSION")) +//! .with_attributes(common_scope_attributes) +//! .build(); +//! let tracer = global::tracer_with_scope(scope.clone()); //! //! tracer.in_span("doing_work", |cx| { //! // Traced app logic here... @@ -208,5 +248,5 @@ extern crate typed_builder; mod exporter; mod propagator; -pub use exporter::{new_pipeline, Error, Exporter, ZipkinPipelineBuilder}; +pub use exporter::{Error, ZipkinExporter, ZipkinExporterBuilder}; pub use propagator::{B3Encoding, Propagator};