Skip to content

Commit

Permalink
Add support for StreamingOutputSpans (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
Falco20019 authored Dec 5, 2022
1 parent 0b7fc9c commit b8db630
Show file tree
Hide file tree
Showing 17 changed files with 192 additions and 97 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ using OpenTracing.Contrib.Grpc;
A `ServerTracingInterceptor` uses default settings, which you can override by creating it using a `ServerTracingInterceptor.Builder`.

- `WithOperationName(IOperationNameConstructor operationName)`: Define how the operation name is constructed for all spans created for this intercepted server. Default is the name of the RPC method. More details in the `Operation Name` section.
- `WithStreaming()`: Logs to the server span whenever a message is is received or a response sent. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know.
- `WithStreaming()`: Logs to the server span whenever a message is received or a response sent. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know.
- `WithStreamingInputSpans()`: Creates a child span for each incoming message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed.
- `WithStreamingOutputSpans()`: Creates a child span for each outgoing message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed.
- `WithVerbosity()`: Logs to the server span additional events, such as message received, headers received and call complete. Default only logs if a call is cancelled.
- `WithTracedAttributes(params ServerRequestAttribute[] attrs)`: Sets tags on the server span in case you want to track information about the RPC call.

Expand All @@ -88,6 +89,7 @@ ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor
.Builder(tracer)
.WithStreaming()
.WithStreamingInputSpans()
.WithStreamingOutputSpans()
.WithVerbosity()
.WithOperationName(new PrefixOperationNameConstructor("Server"))
.WithTracedAttributes(ServerTracingConfiguration.RequestAttribute.Headers,
Expand All @@ -102,6 +104,7 @@ A `ClientTracingInterceptor` also has default settings, which you can override b
- `WithOperationName(IOperationNameConstructor operationName)`: Define how the operation name is constructed for all spans created for this intercepted client. Default is the name of the RPC method. More details in the `Operation Name` section.
- `WithStreaming()`: Logs to the client span whenever a message is sent or a response is received. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know.
- `WithStreamingInputSpans()`: Creates a child span for each incoming message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed.
- `WithStreamingOutputSpans()`: Creates a child span for each outgoing message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed.
- `WithVerbosity()`: Logs to the client span additional events, such as call started, message sent, headers received, response received, and call complete. Default only logs if a call is cancelled.
- `WithTracedAttributes(params ClientRequestAttribute[] attrs)`: Sets tags on the client span in case you want to track information about the RPC call.
- `WithWaitForReady()`: Enables WaitForReady on all RPC calls.
Expand All @@ -121,6 +124,7 @@ ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor
.Builder(tracer)
.WithStreaming()
.WithStreamingInputSpans()
.WithStreamingOutputSpans()
.WithVerbosity()
.WithOperationName(new CustomOperationNameConstructor())
.WithTracingAttributes(ClientTracingConfiguration.RequestAttribute.AllCallOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ internal ClientTracingConfiguration(ITracer tracer) : base(tracer)
TracedAttributes = new HashSet<RequestAttribute>();
}

internal ClientTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool verbose, ISet<RequestAttribute> tracedAttributes, bool waitForReady, CancellationToken fallbackCancellationToken)
: base(tracer, operationNameConstructor, streaming, streamingInputSpans, verbose)
internal ClientTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool streamingOutputSpans, bool verbose, ISet<RequestAttribute> tracedAttributes, bool waitForReady, CancellationToken fallbackCancellationToken)
: base(tracer, operationNameConstructor, streaming, streamingInputSpans, streamingOutputSpans, verbose)
{
TracedAttributes = tracedAttributes ?? new HashSet<RequestAttribute>();
WaitForReady = waitForReady;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ internal ServerTracingConfiguration(ITracer tracer) : base(tracer)
TracedAttributes = new HashSet<RequestAttribute>();
}

internal ServerTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool verbose, ISet<RequestAttribute> tracedAttributes)
: base(tracer, operationNameConstructor, streaming, streamingInputSpans, verbose)
internal ServerTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool streamingOutputSpans, bool verbose, ISet<RequestAttribute> tracedAttributes)
: base(tracer, operationNameConstructor, streaming, streamingInputSpans, streamingOutputSpans, verbose)
{
TracedAttributes = tracedAttributes ?? new HashSet<RequestAttribute>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ public abstract class TracingConfiguration
public IOperationNameConstructor OperationNameConstructor { get; }
public bool Streaming { get; }
public bool StreamingInputSpans { get; }
public bool StreamingOutputSpans { get; }
public bool Verbose { get; }

protected TracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor = null, bool streaming = false, bool streamingInputSpans = false, bool verbose = false)
protected TracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor = null, bool streaming = false, bool streamingInputSpans = false, bool streamingOutputSpans = false, bool verbose = false)
{
Tracer = tracer;
OperationNameConstructor = operationNameConstructor ?? new DefaultOperationNameConstructor();
Streaming = streaming;
StreamingInputSpans = streamingInputSpans;
StreamingOutputSpans = streamingOutputSpans;
Verbose = verbose;
}
}
Expand Down
38 changes: 34 additions & 4 deletions src/OpenTracing.Contrib.Grpc/GrpcTraceLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,46 @@ public void ResponseHeader(Metadata metadata)
});
}

public void BeginScope(string operationName)
public void BeginInputScope(string operationName)
{
if (!(_configuration.StreamingInputSpans || _configuration.Verbose)) return;

_scope = _configuration.Tracer.BuildSpan(operationName).StartActive(false);
BeginScope(operationName);
}

public void EndScope()
public void BeginOutputScope(string operationName)
{
if (_scope == null || !(_configuration.StreamingInputSpans || _configuration.Verbose)) return;
if (!(_configuration.StreamingOutputSpans || _configuration.Verbose)) return;

BeginScope(operationName);
}

private void BeginScope(string operationName)
{
if (_scope != null) EndScope();

_scope = _configuration.Tracer.BuildSpan(operationName)
.AsChildOf(_span.Context)
.StartActive(false);
}

public void EndInputScope()
{
if (!(_configuration.StreamingInputSpans || _configuration.Verbose)) return;

EndScope();
}

public void EndOutputScope()
{
if (!(_configuration.StreamingOutputSpans || _configuration.Verbose)) return;

EndScope();
}

private void EndScope()
{
if (_scope == null) return;

_scope.Span.Finish();
_scope.Dispose();
Expand Down
18 changes: 11 additions & 7 deletions src/OpenTracing.Contrib.Grpc/Handler/InterceptedClientHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ internal class InterceptedClientHandler<TRequest, TResponse>
private readonly ClientTracingConfiguration _configuration;
private readonly ClientInterceptorContext<TRequest, TResponse> _context;
private readonly GrpcTraceLogger<TRequest, TResponse> _logger;
private readonly TracingAsyncStreamReader<TResponse>.StreamActions _streamActions;
private readonly StreamActions<TResponse> _inputStreamActions;
private readonly StreamActions<TRequest> _outputStreamActions;

public InterceptedClientHandler(ClientTracingConfiguration configuration, ClientInterceptorContext<TRequest, TResponse> context)
{
Expand All @@ -35,8 +36,11 @@ public InterceptedClientHandler(ClientTracingConfiguration configuration, Client
_logger = new GrpcTraceLogger<TRequest, TResponse>(span, configuration);
_configuration.Tracer.Inject(span.Context, BuiltinFormats.HttpHeaders, new MetadataCarrier(_context.Options.Headers));

var scopeActions = new ScopeActions("new_response", _logger.BeginScope, _logger.EndScope);
_streamActions = new TracingAsyncStreamReader<TResponse>.StreamActions(scopeActions, _logger.Response, _logger.FinishSuccess, _logger.FinishException);
var inputScopeActions = new ScopeActions("new_response", _logger.BeginInputScope, _logger.EndInputScope);
_inputStreamActions = new StreamActions<TResponse>(inputScopeActions, _logger.Response, _logger.FinishSuccess, _logger.FinishException);

var outputScopeActions = new ScopeActions("new_request", _logger.BeginOutputScope, _logger.EndOutputScope);
_outputStreamActions = new StreamActions<TRequest>(outputScopeActions, _logger.Request);
}

private CallOptions ApplyConfigToCallOptions(CallOptions callOptions)
Expand Down Expand Up @@ -147,15 +151,15 @@ public AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall(TRequest req
{
_logger.Request(request);
var rspCnt = continuation(request, _context);
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _streamActions);
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _inputStreamActions);
var rspHeaderAsync = rspCnt.ResponseHeadersAsync.ContinueWith(LogResponseHeader);
return new AsyncServerStreamingCall<TResponse>(tracingResponseStream, rspHeaderAsync, rspCnt.GetStatus, rspCnt.GetTrailers, rspCnt.Dispose);
}

public AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall(Interceptor.AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
{
var rspCnt = continuation(_context);
var tracingRequestStream = new TracingClientStreamWriter<TRequest>(rspCnt.RequestStream, _logger.Request);
var tracingRequestStream = new TracingClientStreamWriter<TRequest>(rspCnt.RequestStream, _outputStreamActions);
var rspAsync = rspCnt.ResponseAsync.ContinueWith(rspTask =>
{
try
Expand All @@ -178,8 +182,8 @@ public AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall(In
public AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall(Interceptor.AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation)
{
var rspCnt = continuation(_context);
var tracingRequestStream = new TracingClientStreamWriter<TRequest>(rspCnt.RequestStream, _logger.Request);
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _streamActions);
var tracingRequestStream = new TracingClientStreamWriter<TRequest>(rspCnt.RequestStream, _outputStreamActions);
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _inputStreamActions);
var rspHeaderAsync = rspCnt.ResponseHeadersAsync.ContinueWith(LogResponseHeader);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(tracingRequestStream, tracingResponseStream, rspHeaderAsync, rspCnt.GetStatus, rspCnt.GetTrailers, rspCnt.Dispose);
}
Expand Down
26 changes: 17 additions & 9 deletions src/OpenTracing.Contrib.Grpc/Handler/InterceptedServerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ internal class InterceptedServerHandler<TRequest, TResponse>
private readonly ServerTracingConfiguration _configuration;
private readonly ServerCallContext _context;
private readonly GrpcTraceLogger<TRequest, TResponse> _logger;
private readonly TracingAsyncStreamReader<TRequest>.StreamActions _streamActions;
private readonly StreamActions<TRequest> _inputStreamActions;
private readonly StreamActions<TResponse> _outputStreamActions;

public InterceptedServerHandler(ServerTracingConfiguration configuration, ServerCallContext context)
{
Expand All @@ -26,8 +27,11 @@ public InterceptedServerHandler(ServerTracingConfiguration configuration, Server
var span = GetSpanFromContext();
_logger = new GrpcTraceLogger<TRequest, TResponse>(span, configuration);

var scopeActions = new ScopeActions("new_request", _logger.BeginScope, _logger.EndScope);
_streamActions = new TracingAsyncStreamReader<TRequest>.StreamActions(scopeActions, _logger.Request);
var inputScopeActions = new ScopeActions("new_request", _logger.BeginInputScope, _logger.EndInputScope);
_inputStreamActions = new StreamActions<TRequest>(inputScopeActions, _logger.Request);

var outputScopeActions = new ScopeActions("new_response", _logger.BeginOutputScope, _logger.EndOutputScope);
_outputStreamActions = new StreamActions<TResponse>(outputScopeActions, _logger.Response);
}

private ISpan GetSpanFromContext()
Expand Down Expand Up @@ -87,9 +91,11 @@ public async Task<TResponse> ClientStreamingServerHandler(IAsyncStreamReader<TRe
{
try
{
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _streamActions);
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _inputStreamActions);
var response = await continuation(tracingRequestStream, _context).ConfigureAwait(false);
_logger.Response(response);
_outputStreamActions.ScopeActions.BeginScope();
_outputStreamActions.Message(response);
_outputStreamActions.ScopeActions.EndScope();
_logger.FinishSuccess();
return response;
}
Expand All @@ -104,8 +110,10 @@ public async Task ServerStreamingServerHandler(TRequest request, IServerStreamWr
{
try
{
var tracingResponseStream = new TracingServerStreamWriter<TResponse>(responseStream, _logger.Response);
_logger.Request(request);
var tracingResponseStream = new TracingServerStreamWriter<TResponse>(responseStream, _outputStreamActions);
_inputStreamActions.ScopeActions.BeginScope();
_inputStreamActions.Message(request);
_inputStreamActions.ScopeActions.EndScope();
await continuation(request, tracingResponseStream, _context).ConfigureAwait(false);
_logger.FinishSuccess();
}
Expand All @@ -120,8 +128,8 @@ public async Task DuplexStreamingServerHandler(IAsyncStreamReader<TRequest> requ
{
try
{
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _streamActions);
var tracingResponseStream = new TracingServerStreamWriter<TResponse>(responseStream, _logger.Response);
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _inputStreamActions);
var tracingResponseStream = new TracingServerStreamWriter<TResponse>(responseStream, _outputStreamActions);
await continuation(tracingRequestStream, tracingResponseStream, _context).ConfigureAwait(false);
_logger.FinishSuccess();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class Builder
private IOperationNameConstructor _operationNameConstructor;
private bool _streaming;
private bool _streamingInputSpans;
private bool _streamingOutputSpans;
private bool _verbose;
private ISet<ClientTracingConfiguration.RequestAttribute> _tracedAttributes;
private bool _waitForReady;
Expand Down Expand Up @@ -99,6 +100,16 @@ public Builder WithStreamingInputSpans()
return this;
}

/// <summary>
/// Creates a child span for each output message sent.
/// </summary>
/// <returns>this Builder configured to create child spans</returns>
public Builder WithStreamingOutputSpans()
{
_streamingOutputSpans = true;
return this;
}

/// <summary>
/// Logs all request life-cycle events to client spans.
/// </summary>
Expand Down Expand Up @@ -137,7 +148,7 @@ public Builder WithFallbackCancellationToken(CancellationToken cancellationToken

public ClientTracingInterceptor Build()
{
var configuration = new ClientTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _verbose, _tracedAttributes, _waitForReady, _cancellationToken);
var configuration = new ClientTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _streamingOutputSpans, _verbose, _tracedAttributes, _waitForReady, _cancellationToken);
return new ClientTracingInterceptor(configuration);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class Builder
private IOperationNameConstructor _operationNameConstructor;
private bool _streaming;
private bool _streamingInputSpans;
private bool _streamingOutputSpans;
private bool _verbose;
private ISet<ServerTracingConfiguration.RequestAttribute> _tracedAttributes;

Expand Down Expand Up @@ -90,6 +91,16 @@ public Builder WithStreamingInputSpans()
return this;
}

/// <summary>
/// Creates a child span for each output message sent.
/// </summary>
/// <returns>this Builder configured to create child spans</returns>
public Builder WithStreamingOutputSpans()
{
_streamingOutputSpans = true;
return this;
}

/// <summary>
/// Logs all request life-cycle events to server spans.
/// </summary>
Expand All @@ -110,7 +121,7 @@ public Builder WithTracedAttributes(params ServerTracingConfiguration.RequestAtt

public ServerTracingInterceptor Build()
{
var configuration = new ServerTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _verbose, _tracedAttributes);
var configuration = new ServerTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _streamingOutputSpans, _verbose, _tracedAttributes);
return new ServerTracingInterceptor(configuration);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net45;netstandard1.5;netstandard2.0</TargetFrameworks>
<TargetFrameworks>net462;netstandard2.1</TargetFrameworks>
<IsPackable>true</IsPackable>
<Description>Adds OpenTracing instrumentation for .NET Standard apps that use GRPC.</Description>
<PackageTags>opentracing;distributed-tracing;tracing;grpc;netstandard</PackageTags>
Expand Down
Loading

0 comments on commit b8db630

Please sign in to comment.