Skip to content

Commit

Permalink
Add support for WithStreamingInputSpans (#20)
Browse files Browse the repository at this point in the history
* Add support for WithStreamingInputSpans
* Fix deprecated fields
* Fix typo
  • Loading branch information
Falco20019 authored Oct 30, 2020
1 parent 75604ab commit 298c697
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 35 deletions.
8 changes: 6 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
<Import Project="version.props" />
<PropertyGroup>
<Authors>Benjamin Krämer</Authors>
<PackageIconUrl>https://avatars0.githubusercontent.com/u/15482765</PackageIconUrl>
<PackageIcon>opentracing-icon.png</PackageIcon>
<PackageProjectUrl>https://github.com/opentracing-contrib/csharp-grpc</PackageProjectUrl>
<PackageLicenseUrl>https://raw.githubusercontent.com/opentracing-contrib/csharp-grpc/master/LICENSE</PackageLicenseUrl>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<PackageReleaseNotes Condition="'$(Version)' != ''">https://github.com/opentracing-contrib/csharp-grpc/releases/tag/v$(Version)</PackageReleaseNotes>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>git://github.com/opentracing-contrib/csharp-grpc</RepositoryUrl>
Expand All @@ -16,4 +16,8 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>

<ItemGroup>
<None Include="$(SolutionDir)/images/opentracing-icon.png" Pack="true" PackagePath="" />
</ItemGroup>

</Project>
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ using OpenTracing.Contrib.Grpc;
A `ServerTracingInterceptor` uses default settings, which you can override by creating it using a `ServerTracingInterceptor.Builder`.

- `WithOperationName(IOperationNameConstructor operationName)`: Define how the operation name is constructed for all spans created for this intercepted server. Default is the name of the RPC method. More details in the `Operation Name` section.
- `WithStreaming()`: Logs to the server span whenever a message is received. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know.
- `WithStreaming()`: Logs to the server span whenever a message is is received or a response sent. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know.
- `WithStreamingInputSpans()`: Creates a child span for each incoming message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed.
- `WithVerbosity()`: Logs to the server span additional events, such as message received, headers received and call complete. Default only logs if a call is cancelled.
- `WithTracedAttributes(params ServerRequestAttribute[] attrs)`: Sets tags on the server span in case you want to track information about the RPC call.

Expand All @@ -86,6 +87,7 @@ A `ServerTracingInterceptor` uses default settings, which you can override by cr
ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor
.Builder(tracer)
.WithStreaming()
.WithStreamingInputSpans()
.WithVerbosity()
.WithOperationName(new PrefixOperationNameConstructor("Server"))
.WithTracedAttributes(ServerTracingConfiguration.RequestAttribute.Headers,
Expand All @@ -99,6 +101,7 @@ A `ClientTracingInterceptor` also has default settings, which you can override b

- `WithOperationName(IOperationNameConstructor operationName)`: Define how the operation name is constructed for all spans created for this intercepted client. Default is the name of the RPC method. More details in the `Operation Name` section.
- `WithStreaming()`: Logs to the client span whenever a message is sent or a response is received. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know.
- `WithStreamingInputSpans()`: Creates a child span for each incoming message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed.
- `WithVerbosity()`: Logs to the client span additional events, such as call started, message sent, headers received, response received, and call complete. Default only logs if a call is cancelled.
- `WithTracedAttributes(params ClientRequestAttribute[] attrs)`: Sets tags on the client span in case you want to track information about the RPC call.
- `WithWaitForReady()`: Enables WaitForReady on all RPC calls.
Expand All @@ -117,6 +120,7 @@ public class CustomOperationNameConstructor : IOperationNameConstructor
ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor
.Builder(tracer)
.WithStreaming()
.WithStreamingInputSpans()
.WithVerbosity()
.WithOperationName(new CustomOperationNameConstructor())
.WithTracingAttributes(ClientTracingConfiguration.RequestAttribute.AllCallOptions,
Expand Down
Binary file added images/opentracing-icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ internal ClientTracingConfiguration(ITracer tracer) : base(tracer)
TracedAttributes = new HashSet<RequestAttribute>();
}

internal ClientTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool verbose, ISet<RequestAttribute> tracedAttributes, bool waitForReady, CancellationToken fallbackCancellationToken)
: base(tracer, operationNameConstructor, streaming, verbose)
internal ClientTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool verbose, ISet<RequestAttribute> tracedAttributes, bool waitForReady, CancellationToken fallbackCancellationToken)
: base(tracer, operationNameConstructor, streaming, streamingInputSpans, verbose)
{
TracedAttributes = tracedAttributes ?? new HashSet<RequestAttribute>();
WaitForReady = waitForReady;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ internal ServerTracingConfiguration(ITracer tracer) : base(tracer)
TracedAttributes = new HashSet<RequestAttribute>();
}

internal ServerTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool verbose, ISet<RequestAttribute> tracedAttributes)
: base(tracer, operationNameConstructor, streaming, verbose)
internal ServerTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool verbose, ISet<RequestAttribute> tracedAttributes)
: base(tracer, operationNameConstructor, streaming, streamingInputSpans, verbose)
{
TracedAttributes = tracedAttributes ?? new HashSet<RequestAttribute>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ public abstract class TracingConfiguration
public ITracer Tracer { get; }
public IOperationNameConstructor OperationNameConstructor { get; }
public bool Streaming { get; }
public bool StreamingInputSpans { get; }
public bool Verbose { get; }

protected TracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor = null, bool streaming = false, bool verbose = false)
protected TracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor = null, bool streaming = false, bool streamingInputSpans = false, bool verbose = false)
{
Tracer = tracer;
OperationNameConstructor = operationNameConstructor ?? new DefaultOperationNameConstructor();
Streaming = streaming;
StreamingInputSpans = streamingInputSpans;
Verbose = verbose;
}
}
Expand Down
40 changes: 35 additions & 5 deletions src/OpenTracing.Contrib.Grpc/GrpcTraceLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ internal class GrpcTraceLogger<TRequest, TResponse>
private readonly ISpan _span;
private readonly TracingConfiguration _configuration;

private IScope _scope;
private bool _isFinished;

private ISpan ScopeSpan => _scope?.Span ?? _span;

public GrpcTraceLogger(ISpan span, TracingConfiguration configuration)
{
_span = span;
Expand All @@ -34,11 +39,27 @@ public void ResponseHeader(Metadata metadata)
});
}

public void BeginScope(string operationName)
{
if (!(_configuration.StreamingInputSpans || _configuration.Verbose)) return;

_scope = _configuration.Tracer.BuildSpan(operationName).StartActive(false);
}

public void EndScope()
{
if (_scope == null || !(_configuration.StreamingInputSpans || _configuration.Verbose)) return;

_scope.Span.Finish();
_scope.Dispose();
_scope = null;
}

public void Request(TRequest req)
{
if (!(_configuration.Streaming || _configuration.Verbose)) return;

_span.Log(new Dictionary<string, object>
ScopeSpan.Log(new Dictionary<string, object>
{
{ LogFields.Event, "gRPC request" },
{ "data", req }
Expand All @@ -49,7 +70,7 @@ public void Response(TResponse rsp)
{
if (!(_configuration.Streaming || _configuration.Verbose)) return;

_span.Log(new Dictionary<string, object>
ScopeSpan.Log(new Dictionary<string, object>
{
{ LogFields.Event, "gRPC response" },
{ "data", rsp }
Expand All @@ -62,7 +83,7 @@ public void FinishSuccess()
{
_span.Log("Call completed");
}
_span.Finish();
Finish();
}

public void FinishException(Exception ex)
Expand All @@ -71,8 +92,17 @@ public void FinishException(Exception ex)
{
_span.Log("Call failed");
}
_span.SetException(ex)
.Finish();
_span.SetException(ex);
Finish();
}

private void Finish()
{
if (_isFinished) return;

EndScope();
_span.Finish();
_isFinished = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal class InterceptedClientHandler<TRequest, TResponse>
private readonly ClientTracingConfiguration _configuration;
private readonly ClientInterceptorContext<TRequest, TResponse> _context;
private readonly GrpcTraceLogger<TRequest, TResponse> _logger;
private readonly TracingAsyncStreamReader<TResponse>.StreamActions _streamActions;

public InterceptedClientHandler(ClientTracingConfiguration configuration, ClientInterceptorContext<TRequest, TResponse> context)
{
Expand All @@ -33,6 +34,9 @@ public InterceptedClientHandler(ClientTracingConfiguration configuration, Client
var span = InitializeSpanWithHeaders();
_logger = new GrpcTraceLogger<TRequest, TResponse>(span, configuration);
_configuration.Tracer.Inject(span.Context, BuiltinFormats.HttpHeaders, new MetadataCarrier(_context.Options.Headers));

var scopeActions = new ScopeActions("new_response", _logger.BeginScope, _logger.EndScope);
_streamActions = new TracingAsyncStreamReader<TResponse>.StreamActions(scopeActions, _logger.Response, _logger.FinishSuccess, _logger.FinishException);
}

private CallOptions ApplyConfigToCallOptions(CallOptions callOptions)
Expand Down Expand Up @@ -143,7 +147,7 @@ public AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall(TRequest req
{
_logger.Request(request);
var rspCnt = continuation(request, _context);
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _logger.Response, _logger.FinishSuccess, _logger.FinishException);
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _streamActions);
var rspHeaderAsync = rspCnt.ResponseHeadersAsync.ContinueWith(LogResponseHeader);
return new AsyncServerStreamingCall<TResponse>(tracingResponseStream, rspHeaderAsync, rspCnt.GetStatus, rspCnt.GetTrailers, rspCnt.Dispose);
}
Expand Down Expand Up @@ -175,7 +179,7 @@ public AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall(In
{
var rspCnt = continuation(_context);
var tracingRequestStream = new TracingClientStreamWriter<TRequest>(rspCnt.RequestStream, _logger.Request);
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _logger.Response, _logger.FinishSuccess, _logger.FinishException);
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _streamActions);
var rspHeaderAsync = rspCnt.ResponseHeadersAsync.ContinueWith(LogResponseHeader);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(tracingRequestStream, tracingResponseStream, rspHeaderAsync, rspCnt.GetStatus, rspCnt.GetTrailers, rspCnt.Dispose);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class InterceptedServerHandler<TRequest, TResponse>
private readonly ServerTracingConfiguration _configuration;
private readonly ServerCallContext _context;
private readonly GrpcTraceLogger<TRequest, TResponse> _logger;
private readonly TracingAsyncStreamReader<TRequest>.StreamActions _streamActions;

public InterceptedServerHandler(ServerTracingConfiguration configuration, ServerCallContext context)
{
Expand All @@ -24,6 +25,9 @@ public InterceptedServerHandler(ServerTracingConfiguration configuration, Server

var span = GetSpanFromContext();
_logger = new GrpcTraceLogger<TRequest, TResponse>(span, configuration);

var scopeActions = new ScopeActions("new_request", _logger.BeginScope, _logger.EndScope);
_streamActions = new TracingAsyncStreamReader<TRequest>.StreamActions(scopeActions, _logger.Request);
}

private ISpan GetSpanFromContext()
Expand Down Expand Up @@ -83,7 +87,7 @@ public async Task<TResponse> ClientStreamingServerHandler(IAsyncStreamReader<TRe
{
try
{
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _logger.Request);
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _streamActions);
var response = await continuation(tracingRequestStream, _context).ConfigureAwait(false);
_logger.Response(response);
_logger.FinishSuccess();
Expand Down Expand Up @@ -116,7 +120,7 @@ public async Task DuplexStreamingServerHandler(IAsyncStreamReader<TRequest> requ
{
try
{
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _logger.Request);
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _streamActions);
var tracingResponseStream = new TracingServerStreamWriter<TResponse>(responseStream, _logger.Response);
await continuation(tracingRequestStream, tracingResponseStream, _context).ConfigureAwait(false);
_logger.FinishSuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class Builder
private readonly ITracer _tracer;
private IOperationNameConstructor _operationNameConstructor;
private bool _streaming;
private bool _streamingInputSpans;
private bool _verbose;
private ISet<ClientTracingConfiguration.RequestAttribute> _tracedAttributes;
private bool _waitForReady;
Expand Down Expand Up @@ -88,6 +89,16 @@ public Builder WithStreaming()
return this;
}

/// <summary>
/// Creates a child span for each input message received.
/// </summary>
/// <returns>this Builder configured to create child spans</returns>
public Builder WithStreamingInputSpans()
{
_streamingInputSpans = true;
return this;
}

/// <summary>
/// Logs all request life-cycle events to client spans.
/// </summary>
Expand Down Expand Up @@ -126,7 +137,7 @@ public Builder WithFallbackCancellationToken(CancellationToken cancellationToken

public ClientTracingInterceptor Build()
{
var configuration = new ClientTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _verbose, _tracedAttributes, _waitForReady, _cancellationToken);
var configuration = new ClientTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _verbose, _tracedAttributes, _waitForReady, _cancellationToken);
return new ClientTracingInterceptor(configuration);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class Builder
private readonly ITracer _tracer;
private IOperationNameConstructor _operationNameConstructor;
private bool _streaming;
private bool _streamingInputSpans;
private bool _verbose;
private ISet<ServerTracingConfiguration.RequestAttribute> _tracedAttributes;

Expand All @@ -70,7 +71,7 @@ public Builder WithOperationName(IOperationNameConstructor operationNameConstruc
}

/// <summary>
/// Logs streaming events to client spans.
/// Logs streaming events to server spans.
/// </summary>
/// <returns>this Builder configured to log streaming events</returns>
public Builder WithStreaming()
Expand All @@ -80,7 +81,17 @@ public Builder WithStreaming()
}

/// <summary>
/// Logs all request life-cycle events to client spans.
/// Creates a child span for each input message received.
/// </summary>
/// <returns>this Builder configured to create child spans</returns>
public Builder WithStreamingInputSpans()
{
_streamingInputSpans = true;
return this;
}

/// <summary>
/// Logs all request life-cycle events to server spans.
/// </summary>
/// <returns>this Builder configured to be verbose</returns>
public Builder WithVerbosity()
Expand All @@ -89,7 +100,7 @@ public Builder WithVerbosity()
return this;
}

/// <param name="tracedAttributes">to set as tags on client spans created by this intercepter</param>
/// <param name="tracedAttributes">to set as tags on server spans created by this intercepter</param>
/// <returns>this Builder configured to trace attributes</returns>
public Builder WithTracedAttributes(params ServerTracingConfiguration.RequestAttribute[] tracedAttributes)
{
Expand All @@ -99,7 +110,7 @@ public Builder WithTracedAttributes(params ServerTracingConfiguration.RequestAtt

public ServerTracingInterceptor Build()
{
var configuration = new ServerTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _verbose, _tracedAttributes);
var configuration = new ServerTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _verbose, _tracedAttributes);
return new ServerTracingInterceptor(configuration);
}
}
Expand Down
28 changes: 28 additions & 0 deletions src/OpenTracing.Contrib.Grpc/Streaming/ScopeActions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;

namespace OpenTracing.Contrib.Grpc.Streaming
{
internal readonly struct ScopeActions
{
public string ScopeOperationName { get; }
public Action<string> OnBeginScope { get; }
public Action OnEndScope { get; }

public ScopeActions(string scopeOperationName, Action<string> onBeginScope, Action onEndScope)
{
ScopeOperationName = scopeOperationName;
OnBeginScope = onBeginScope;
OnEndScope = onEndScope;
}

public void BeginScope()
{
OnBeginScope?.Invoke(ScopeOperationName);
}

public void EndScope()
{
OnEndScope?.Invoke();
}
}
}
Loading

0 comments on commit 298c697

Please sign in to comment.