Skip to content

Commit

Permalink
feat: Replace Zipkin pipeline with exporter builders (#2565)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
gruebel and cijothomas authored Jan 29, 2025
1 parent 144fdd9 commit 17cce83
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 151 deletions.
20 changes: 20 additions & 0 deletions opentelemetry-zipkin/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,26 @@
## vNext

- Bump msrv to 1.75.0.
- **Breaking** The `opentelemetry_zipkin::new_pipeline()` interface is now replaced with `opentelemetry_zipkin::ZipkinExporter::builder()`.

Previous Signature:
```rust
let tracer = opentelemetry_zipkin::new_pipeline()
.with_service_name("trace-demo")
.install_simple()?;
```
Updated Signature:
```rust
let exporter = ZipkinExporter::builder()
.with_service_name("trace-demo")
.build()?;
let provider = TracerProvider::builder()
.with_simple_exporter(exporter)
.build();
global::set_tracer_provider(provider.clone());

let tracer = global::tracer("zipkin-tracer");
```

## 0.27.0

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-zipkin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"]

[dependencies]
async-trait = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { version = "0.27", path = "../opentelemetry" }
opentelemetry_sdk = { version = "0.27", path = "../opentelemetry-sdk", features = ["trace"] }
Expand All @@ -41,6 +40,7 @@ thiserror = { workspace = true }
futures-core = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
bytes = { workspace = true }
futures-util = { workspace = true, features = ["io"] }
http-body-util = { workspace = true }
Expand Down
27 changes: 23 additions & 4 deletions opentelemetry-zipkin/examples/zipkin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use opentelemetry::{
global::{self},
trace::{Span, Tracer},
trace::{Span, TraceError, Tracer},
InstrumentationScope, KeyValue,
};
use opentelemetry_sdk::trace::TracerProvider;
use opentelemetry_zipkin::ZipkinExporter;
use std::thread;
use std::time::Duration;

Expand All @@ -12,10 +15,26 @@ fn bar() {
span.end()
}

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let (tracer, provider) = opentelemetry_zipkin::new_pipeline()
fn init_traces() -> Result<TracerProvider, TraceError> {
let exporter = ZipkinExporter::builder()
.with_service_name("trace-demo")
.install_simple()?;
.build()?;

Ok(TracerProvider::builder()
.with_simple_exporter(exporter)
.build())
}

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let provider = init_traces()?;
global::set_tracer_provider(provider.clone());

let common_scope_attributes = vec![KeyValue::new("scope-key", "scope-value")];
let scope = InstrumentationScope::builder("opentelemetry-zipkin")
.with_version(env!("CARGO_PKG_VERSION"))
.with_attributes(common_scope_attributes)
.build();
let tracer = global::tracer_with_scope(scope.clone());

tracer.in_span("foo", |_cx| {
thread::sleep(Duration::from_millis(6));
Expand Down
140 changes: 32 additions & 108 deletions opentelemetry-zipkin/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,54 @@ mod env;
mod model;
mod uploader;

use async_trait::async_trait;
use futures_core::future::BoxFuture;
use http::Uri;
use model::endpoint::Endpoint;
use opentelemetry::{global, trace::TraceError, InstrumentationScope, KeyValue};
use opentelemetry::trace::TraceError;
use opentelemetry_http::HttpClient;
use opentelemetry_sdk::{
resource::{ResourceDetector, SdkProvidedResourceDetector},
trace,
trace::{Config, Tracer, TracerProvider},
ExportError, Resource,
trace, ExportError,
};
use opentelemetry_semantic_conventions as semcov;
use std::borrow::Cow;
use std::net::SocketAddr;
use std::net::{AddrParseError, SocketAddr};
use std::sync::Arc;

/// Zipkin span exporter
#[derive(Debug)]
pub struct Exporter {
pub struct ZipkinExporter {
local_endpoint: Endpoint,
uploader: uploader::Uploader,
}

impl Exporter {
impl ZipkinExporter {
/// Get a builder to configure a [ZipkinExporter].
pub fn builder() -> ZipkinExporterBuilder {
ZipkinExporterBuilder::default()
}

fn new(local_endpoint: Endpoint, client: Arc<dyn HttpClient>, collector_endpoint: Uri) -> Self {
Exporter {
ZipkinExporter {
local_endpoint,
uploader: uploader::Uploader::new(client, collector_endpoint),
}
}
}

/// Create a new Zipkin exporter pipeline builder.
pub fn new_pipeline() -> ZipkinPipelineBuilder {
ZipkinPipelineBuilder::default()
}

/// Builder for `ExporterConfig` struct.
/// Builder for `ZipkinExporter` struct.
#[derive(Debug)]
pub struct ZipkinPipelineBuilder {
pub struct ZipkinExporterBuilder {
service_name: Option<String>,
service_addr: Option<SocketAddr>,
collector_endpoint: String,
trace_config: Option<Config>,
client: Option<Arc<dyn HttpClient>>,
}

impl Default for ZipkinPipelineBuilder {
impl Default for ZipkinExporterBuilder {
fn default() -> Self {
let timeout = env::get_timeout();

ZipkinPipelineBuilder {
ZipkinExporterBuilder {
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Arc::new(
reqwest::blocking::Client::builder()
Expand All @@ -78,58 +73,29 @@ impl Default for ZipkinPipelineBuilder {
service_name: None,
service_addr: None,
collector_endpoint: env::get_endpoint(),
trace_config: None,
}
}
}

impl ZipkinPipelineBuilder {
/// Initial a Zipkin span exporter.
impl ZipkinExporterBuilder {
/// Creates a new [ZipkinExporter] from this configuration.
///
/// Returns error if the endpoint is not valid or if no http client is provided.
pub fn init_exporter(mut self) -> Result<Exporter, TraceError> {
let (_, endpoint) = self.init_config_and_endpoint();
self.init_exporter_with_endpoint(endpoint)
}

fn init_config_and_endpoint(&mut self) -> (Config, Endpoint) {
let service_name = self.service_name.take();
if let Some(service_name) = service_name {
let config = if let Some(mut cfg) = self.trace_config.take() {
cfg.resource = Cow::Owned(
Resource::builder_empty()
.with_attributes(
cfg.resource
.iter()
.filter(|(k, _v)| k.as_str() != semcov::resource::SERVICE_NAME)
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<KeyValue>>(),
)
.build(),
);
cfg
} else {
#[allow(deprecated)]
Config::default().with_resource(Resource::builder_empty().build())
};
(config, Endpoint::new(service_name, self.service_addr))
pub fn build(self) -> Result<ZipkinExporter, TraceError> {
let service_name = if let Some(service_name) = self.service_name {
service_name
} else {
let service_name = SdkProvidedResourceDetector
SdkProvidedResourceDetector
.detect()
.get(&semcov::resource::SERVICE_NAME.into())
.unwrap()
.to_string();
(
#[allow(deprecated)]
Config::default().with_resource(Resource::builder_empty().build()),
Endpoint::new(service_name, self.service_addr),
)
}
}
.to_string()
};

let endpoint = Endpoint::new(service_name, self.service_addr);

fn init_exporter_with_endpoint(self, endpoint: Endpoint) -> Result<Exporter, TraceError> {
if let Some(client) = self.client {
let exporter = Exporter::new(
let exporter = ZipkinExporter::new(
endpoint,
client,
self.collector_endpoint
Expand All @@ -142,45 +108,6 @@ impl ZipkinPipelineBuilder {
}
}

/// Install the Zipkin trace exporter pipeline with a simple span processor.
#[allow(deprecated)]
pub fn install_simple(
mut self,
) -> Result<(Tracer, opentelemetry_sdk::trace::TracerProvider), TraceError> {
let (config, endpoint) = self.init_config_and_endpoint();
let exporter = self.init_exporter_with_endpoint(endpoint)?;
let mut provider_builder = TracerProvider::builder().with_simple_exporter(exporter);
provider_builder = provider_builder.with_config(config);
let provider = provider_builder.build();
let scope = InstrumentationScope::builder("opentelemetry-zipkin")
.with_version(env!("CARGO_PKG_VERSION"))
.with_schema_url(semcov::SCHEMA_URL)
.build();
let tracer = opentelemetry::trace::TracerProvider::tracer_with_scope(&provider, scope);
let _ = global::set_tracer_provider(provider.clone());
Ok((tracer, provider))
}

/// Install the Zipkin trace exporter pipeline with a batch span processor using the specified
/// runtime.
#[allow(deprecated)]
pub fn install_batch(
mut self,
) -> Result<(Tracer, opentelemetry_sdk::trace::TracerProvider), TraceError> {
let (config, endpoint) = self.init_config_and_endpoint();
let exporter = self.init_exporter_with_endpoint(endpoint)?;
let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter);
provider_builder = provider_builder.with_config(config);
let provider = provider_builder.build();
let scope = InstrumentationScope::builder("opentelemetry-zipkin")
.with_version(env!("CARGO_PKG_VERSION"))
.with_schema_url(semcov::SCHEMA_URL)
.build();
let tracer = opentelemetry::trace::TracerProvider::tracer_with_scope(&provider, scope);
let _ = global::set_tracer_provider(provider.clone());
Ok((tracer, provider))
}

/// Assign the service name under which to group traces.
pub fn with_service_name<T: Into<String>>(mut self, name: T) -> Self {
self.service_name = Some(name.into());
Expand All @@ -193,7 +120,7 @@ impl ZipkinPipelineBuilder {
self
}

/// Assign the service name under which to group traces.
/// Assign the service address.
pub fn with_service_address(mut self, addr: SocketAddr) -> Self {
self.service_addr = Some(addr);
self
Expand All @@ -204,12 +131,6 @@ impl ZipkinPipelineBuilder {
self.collector_endpoint = endpoint.into();
self
}

/// Assign the SDK trace configuration.
pub fn with_trace_config(mut self, config: Config) -> Self {
self.trace_config = Some(config);
self
}
}

async fn zipkin_export(
Expand All @@ -225,8 +146,7 @@ async fn zipkin_export(
uploader.upload(zipkin_spans).await
}

#[async_trait]
impl trace::SpanExporter for Exporter {
impl trace::SpanExporter for ZipkinExporter {
/// Export spans to Zipkin collector.
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
Box::pin(zipkin_export(
Expand All @@ -253,6 +173,10 @@ pub enum Error {
#[error("invalid uri")]
InvalidUri(#[from] http::uri::InvalidUri),

/// The IP/socket address provided is invalid
#[error("invalid address")]
InvalidAddress(#[from] AddrParseError),

/// Other errors
#[error("export error: {0}")]
Other(String),
Expand Down
Loading

0 comments on commit 17cce83

Please sign in to comment.