From b8db630fc637ea7b7d396f5795dc03e79a88c906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Kr=C3=A4mer?= Date: Mon, 5 Dec 2022 15:46:10 +0100 Subject: [PATCH] Add support for StreamingOutputSpans (#23) --- README.md | 6 ++- .../ClientTracingConfiguration.cs | 4 +- .../ServerTracingConfiguration.cs | 4 +- .../Configuration/TracingConfiguration.cs | 4 +- .../GrpcTraceLogger.cs | 38 +++++++++++++-- .../Handler/InterceptedClientHandler.cs | 18 +++++--- .../Handler/InterceptedServerHandler.cs | 26 +++++++---- .../Interceptors/ClientTracingInterceptor.cs | 13 +++++- .../Interceptors/ServerTracingInterceptor.cs | 13 +++++- .../OpenTracing.Contrib.Grpc.csproj | 2 +- .../Streaming/StreamActions.cs | 35 ++++++++++++++ .../TracingAsyncStreamReader.StreamActions.cs | 38 --------------- .../Streaming/TracingAsyncStreamReader.cs | 6 +-- .../Streaming/TracingClientStreamWriter.cs | 20 ++++---- .../Streaming/TracingServerStreamWriter.cs | 14 +++--- .../OpenTracing.Contrib.Grpc.Test.csproj | 2 +- test/OpenTracing.Contrib.Grpc.Test/Program.cs | 46 ++++++++++++++----- 17 files changed, 192 insertions(+), 97 deletions(-) create mode 100644 src/OpenTracing.Contrib.Grpc/Streaming/StreamActions.cs delete mode 100644 src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.StreamActions.cs diff --git a/README.md b/README.md index 39c2d31..b7d1748 100644 --- a/README.md +++ b/README.md @@ -76,8 +76,9 @@ using OpenTracing.Contrib.Grpc; A `ServerTracingInterceptor` uses default settings, which you can override by creating it using a `ServerTracingInterceptor.Builder`. - `WithOperationName(IOperationNameConstructor operationName)`: Define how the operation name is constructed for all spans created for this intercepted server. Default is the name of the RPC method. More details in the `Operation Name` section. -- `WithStreaming()`: Logs to the server span whenever a message is is received or a response sent. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know. +- `WithStreaming()`: Logs to the server span whenever a message is received or a response sent. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know. - `WithStreamingInputSpans()`: Creates a child span for each incoming message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed. +- `WithStreamingOutputSpans()`: Creates a child span for each outgoing message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed. - `WithVerbosity()`: Logs to the server span additional events, such as message received, headers received and call complete. Default only logs if a call is cancelled. - `WithTracedAttributes(params ServerRequestAttribute[] attrs)`: Sets tags on the server span in case you want to track information about the RPC call. @@ -88,6 +89,7 @@ ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor .Builder(tracer) .WithStreaming() .WithStreamingInputSpans() + .WithStreamingOutputSpans() .WithVerbosity() .WithOperationName(new PrefixOperationNameConstructor("Server")) .WithTracedAttributes(ServerTracingConfiguration.RequestAttribute.Headers, @@ -102,6 +104,7 @@ A `ClientTracingInterceptor` also has default settings, which you can override b - `WithOperationName(IOperationNameConstructor operationName)`: Define how the operation name is constructed for all spans created for this intercepted client. Default is the name of the RPC method. More details in the `Operation Name` section. - `WithStreaming()`: Logs to the client span whenever a message is sent or a response is received. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know. - `WithStreamingInputSpans()`: Creates a child span for each incoming message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed. +- `WithStreamingOutputSpans()`: Creates a child span for each outgoing message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed. - `WithVerbosity()`: Logs to the client span additional events, such as call started, message sent, headers received, response received, and call complete. Default only logs if a call is cancelled. - `WithTracedAttributes(params ClientRequestAttribute[] attrs)`: Sets tags on the client span in case you want to track information about the RPC call. - `WithWaitForReady()`: Enables WaitForReady on all RPC calls. @@ -121,6 +124,7 @@ ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor .Builder(tracer) .WithStreaming() .WithStreamingInputSpans() + .WithStreamingOutputSpans() .WithVerbosity() .WithOperationName(new CustomOperationNameConstructor()) .WithTracingAttributes(ClientTracingConfiguration.RequestAttribute.AllCallOptions, diff --git a/src/OpenTracing.Contrib.Grpc/Configuration/ClientTracingConfiguration.cs b/src/OpenTracing.Contrib.Grpc/Configuration/ClientTracingConfiguration.cs index 9d58371..73d4598 100644 --- a/src/OpenTracing.Contrib.Grpc/Configuration/ClientTracingConfiguration.cs +++ b/src/OpenTracing.Contrib.Grpc/Configuration/ClientTracingConfiguration.cs @@ -26,8 +26,8 @@ internal ClientTracingConfiguration(ITracer tracer) : base(tracer) TracedAttributes = new HashSet(); } - internal ClientTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool verbose, ISet tracedAttributes, bool waitForReady, CancellationToken fallbackCancellationToken) - : base(tracer, operationNameConstructor, streaming, streamingInputSpans, verbose) + internal ClientTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool streamingOutputSpans, bool verbose, ISet tracedAttributes, bool waitForReady, CancellationToken fallbackCancellationToken) + : base(tracer, operationNameConstructor, streaming, streamingInputSpans, streamingOutputSpans, verbose) { TracedAttributes = tracedAttributes ?? new HashSet(); WaitForReady = waitForReady; diff --git a/src/OpenTracing.Contrib.Grpc/Configuration/ServerTracingConfiguration.cs b/src/OpenTracing.Contrib.Grpc/Configuration/ServerTracingConfiguration.cs index 6fca0c7..ebfd904 100644 --- a/src/OpenTracing.Contrib.Grpc/Configuration/ServerTracingConfiguration.cs +++ b/src/OpenTracing.Contrib.Grpc/Configuration/ServerTracingConfiguration.cs @@ -20,8 +20,8 @@ internal ServerTracingConfiguration(ITracer tracer) : base(tracer) TracedAttributes = new HashSet(); } - internal ServerTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool verbose, ISet tracedAttributes) - : base(tracer, operationNameConstructor, streaming, streamingInputSpans, verbose) + internal ServerTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool streamingOutputSpans, bool verbose, ISet tracedAttributes) + : base(tracer, operationNameConstructor, streaming, streamingInputSpans, streamingOutputSpans, verbose) { TracedAttributes = tracedAttributes ?? new HashSet(); } diff --git a/src/OpenTracing.Contrib.Grpc/Configuration/TracingConfiguration.cs b/src/OpenTracing.Contrib.Grpc/Configuration/TracingConfiguration.cs index d611541..a24c876 100644 --- a/src/OpenTracing.Contrib.Grpc/Configuration/TracingConfiguration.cs +++ b/src/OpenTracing.Contrib.Grpc/Configuration/TracingConfiguration.cs @@ -8,14 +8,16 @@ public abstract class TracingConfiguration public IOperationNameConstructor OperationNameConstructor { get; } public bool Streaming { get; } public bool StreamingInputSpans { get; } + public bool StreamingOutputSpans { get; } public bool Verbose { get; } - protected TracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor = null, bool streaming = false, bool streamingInputSpans = false, bool verbose = false) + protected TracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor = null, bool streaming = false, bool streamingInputSpans = false, bool streamingOutputSpans = false, bool verbose = false) { Tracer = tracer; OperationNameConstructor = operationNameConstructor ?? new DefaultOperationNameConstructor(); Streaming = streaming; StreamingInputSpans = streamingInputSpans; + StreamingOutputSpans = streamingOutputSpans; Verbose = verbose; } } diff --git a/src/OpenTracing.Contrib.Grpc/GrpcTraceLogger.cs b/src/OpenTracing.Contrib.Grpc/GrpcTraceLogger.cs index ad0cc8b..59337dc 100644 --- a/src/OpenTracing.Contrib.Grpc/GrpcTraceLogger.cs +++ b/src/OpenTracing.Contrib.Grpc/GrpcTraceLogger.cs @@ -39,16 +39,46 @@ public void ResponseHeader(Metadata metadata) }); } - public void BeginScope(string operationName) + public void BeginInputScope(string operationName) { if (!(_configuration.StreamingInputSpans || _configuration.Verbose)) return; - _scope = _configuration.Tracer.BuildSpan(operationName).StartActive(false); + BeginScope(operationName); } - public void EndScope() + public void BeginOutputScope(string operationName) { - if (_scope == null || !(_configuration.StreamingInputSpans || _configuration.Verbose)) return; + if (!(_configuration.StreamingOutputSpans || _configuration.Verbose)) return; + + BeginScope(operationName); + } + + private void BeginScope(string operationName) + { + if (_scope != null) EndScope(); + + _scope = _configuration.Tracer.BuildSpan(operationName) + .AsChildOf(_span.Context) + .StartActive(false); + } + + public void EndInputScope() + { + if (!(_configuration.StreamingInputSpans || _configuration.Verbose)) return; + + EndScope(); + } + + public void EndOutputScope() + { + if (!(_configuration.StreamingOutputSpans || _configuration.Verbose)) return; + + EndScope(); + } + + private void EndScope() + { + if (_scope == null) return; _scope.Span.Finish(); _scope.Dispose(); diff --git a/src/OpenTracing.Contrib.Grpc/Handler/InterceptedClientHandler.cs b/src/OpenTracing.Contrib.Grpc/Handler/InterceptedClientHandler.cs index 77fb7d6..50f2266 100644 --- a/src/OpenTracing.Contrib.Grpc/Handler/InterceptedClientHandler.cs +++ b/src/OpenTracing.Contrib.Grpc/Handler/InterceptedClientHandler.cs @@ -18,7 +18,8 @@ internal class InterceptedClientHandler private readonly ClientTracingConfiguration _configuration; private readonly ClientInterceptorContext _context; private readonly GrpcTraceLogger _logger; - private readonly TracingAsyncStreamReader.StreamActions _streamActions; + private readonly StreamActions _inputStreamActions; + private readonly StreamActions _outputStreamActions; public InterceptedClientHandler(ClientTracingConfiguration configuration, ClientInterceptorContext context) { @@ -35,8 +36,11 @@ public InterceptedClientHandler(ClientTracingConfiguration configuration, Client _logger = new GrpcTraceLogger(span, configuration); _configuration.Tracer.Inject(span.Context, BuiltinFormats.HttpHeaders, new MetadataCarrier(_context.Options.Headers)); - var scopeActions = new ScopeActions("new_response", _logger.BeginScope, _logger.EndScope); - _streamActions = new TracingAsyncStreamReader.StreamActions(scopeActions, _logger.Response, _logger.FinishSuccess, _logger.FinishException); + var inputScopeActions = new ScopeActions("new_response", _logger.BeginInputScope, _logger.EndInputScope); + _inputStreamActions = new StreamActions(inputScopeActions, _logger.Response, _logger.FinishSuccess, _logger.FinishException); + + var outputScopeActions = new ScopeActions("new_request", _logger.BeginOutputScope, _logger.EndOutputScope); + _outputStreamActions = new StreamActions(outputScopeActions, _logger.Request); } private CallOptions ApplyConfigToCallOptions(CallOptions callOptions) @@ -147,7 +151,7 @@ public AsyncServerStreamingCall AsyncServerStreamingCall(TRequest req { _logger.Request(request); var rspCnt = continuation(request, _context); - var tracingResponseStream = new TracingAsyncStreamReader(rspCnt.ResponseStream, _streamActions); + var tracingResponseStream = new TracingAsyncStreamReader(rspCnt.ResponseStream, _inputStreamActions); var rspHeaderAsync = rspCnt.ResponseHeadersAsync.ContinueWith(LogResponseHeader); return new AsyncServerStreamingCall(tracingResponseStream, rspHeaderAsync, rspCnt.GetStatus, rspCnt.GetTrailers, rspCnt.Dispose); } @@ -155,7 +159,7 @@ public AsyncServerStreamingCall AsyncServerStreamingCall(TRequest req public AsyncClientStreamingCall AsyncClientStreamingCall(Interceptor.AsyncClientStreamingCallContinuation continuation) { var rspCnt = continuation(_context); - var tracingRequestStream = new TracingClientStreamWriter(rspCnt.RequestStream, _logger.Request); + var tracingRequestStream = new TracingClientStreamWriter(rspCnt.RequestStream, _outputStreamActions); var rspAsync = rspCnt.ResponseAsync.ContinueWith(rspTask => { try @@ -178,8 +182,8 @@ public AsyncClientStreamingCall AsyncClientStreamingCall(In public AsyncDuplexStreamingCall AsyncDuplexStreamingCall(Interceptor.AsyncDuplexStreamingCallContinuation continuation) { var rspCnt = continuation(_context); - var tracingRequestStream = new TracingClientStreamWriter(rspCnt.RequestStream, _logger.Request); - var tracingResponseStream = new TracingAsyncStreamReader(rspCnt.ResponseStream, _streamActions); + var tracingRequestStream = new TracingClientStreamWriter(rspCnt.RequestStream, _outputStreamActions); + var tracingResponseStream = new TracingAsyncStreamReader(rspCnt.ResponseStream, _inputStreamActions); var rspHeaderAsync = rspCnt.ResponseHeadersAsync.ContinueWith(LogResponseHeader); return new AsyncDuplexStreamingCall(tracingRequestStream, tracingResponseStream, rspHeaderAsync, rspCnt.GetStatus, rspCnt.GetTrailers, rspCnt.Dispose); } diff --git a/src/OpenTracing.Contrib.Grpc/Handler/InterceptedServerHandler.cs b/src/OpenTracing.Contrib.Grpc/Handler/InterceptedServerHandler.cs index 1df587d..5fd29d5 100644 --- a/src/OpenTracing.Contrib.Grpc/Handler/InterceptedServerHandler.cs +++ b/src/OpenTracing.Contrib.Grpc/Handler/InterceptedServerHandler.cs @@ -16,7 +16,8 @@ internal class InterceptedServerHandler private readonly ServerTracingConfiguration _configuration; private readonly ServerCallContext _context; private readonly GrpcTraceLogger _logger; - private readonly TracingAsyncStreamReader.StreamActions _streamActions; + private readonly StreamActions _inputStreamActions; + private readonly StreamActions _outputStreamActions; public InterceptedServerHandler(ServerTracingConfiguration configuration, ServerCallContext context) { @@ -26,8 +27,11 @@ public InterceptedServerHandler(ServerTracingConfiguration configuration, Server var span = GetSpanFromContext(); _logger = new GrpcTraceLogger(span, configuration); - var scopeActions = new ScopeActions("new_request", _logger.BeginScope, _logger.EndScope); - _streamActions = new TracingAsyncStreamReader.StreamActions(scopeActions, _logger.Request); + var inputScopeActions = new ScopeActions("new_request", _logger.BeginInputScope, _logger.EndInputScope); + _inputStreamActions = new StreamActions(inputScopeActions, _logger.Request); + + var outputScopeActions = new ScopeActions("new_response", _logger.BeginOutputScope, _logger.EndOutputScope); + _outputStreamActions = new StreamActions(outputScopeActions, _logger.Response); } private ISpan GetSpanFromContext() @@ -87,9 +91,11 @@ public async Task ClientStreamingServerHandler(IAsyncStreamReader(requestStream, _streamActions); + var tracingRequestStream = new TracingAsyncStreamReader(requestStream, _inputStreamActions); var response = await continuation(tracingRequestStream, _context).ConfigureAwait(false); - _logger.Response(response); + _outputStreamActions.ScopeActions.BeginScope(); + _outputStreamActions.Message(response); + _outputStreamActions.ScopeActions.EndScope(); _logger.FinishSuccess(); return response; } @@ -104,8 +110,10 @@ public async Task ServerStreamingServerHandler(TRequest request, IServerStreamWr { try { - var tracingResponseStream = new TracingServerStreamWriter(responseStream, _logger.Response); - _logger.Request(request); + var tracingResponseStream = new TracingServerStreamWriter(responseStream, _outputStreamActions); + _inputStreamActions.ScopeActions.BeginScope(); + _inputStreamActions.Message(request); + _inputStreamActions.ScopeActions.EndScope(); await continuation(request, tracingResponseStream, _context).ConfigureAwait(false); _logger.FinishSuccess(); } @@ -120,8 +128,8 @@ public async Task DuplexStreamingServerHandler(IAsyncStreamReader requ { try { - var tracingRequestStream = new TracingAsyncStreamReader(requestStream, _streamActions); - var tracingResponseStream = new TracingServerStreamWriter(responseStream, _logger.Response); + var tracingRequestStream = new TracingAsyncStreamReader(requestStream, _inputStreamActions); + var tracingResponseStream = new TracingServerStreamWriter(responseStream, _outputStreamActions); await continuation(tracingRequestStream, tracingResponseStream, _context).ConfigureAwait(false); _logger.FinishSuccess(); } diff --git a/src/OpenTracing.Contrib.Grpc/Interceptors/ClientTracingInterceptor.cs b/src/OpenTracing.Contrib.Grpc/Interceptors/ClientTracingInterceptor.cs index eaf2f46..5dae0d1 100644 --- a/src/OpenTracing.Contrib.Grpc/Interceptors/ClientTracingInterceptor.cs +++ b/src/OpenTracing.Contrib.Grpc/Interceptors/ClientTracingInterceptor.cs @@ -61,6 +61,7 @@ public class Builder private IOperationNameConstructor _operationNameConstructor; private bool _streaming; private bool _streamingInputSpans; + private bool _streamingOutputSpans; private bool _verbose; private ISet _tracedAttributes; private bool _waitForReady; @@ -99,6 +100,16 @@ public Builder WithStreamingInputSpans() return this; } + /// + /// Creates a child span for each output message sent. + /// + /// this Builder configured to create child spans + public Builder WithStreamingOutputSpans() + { + _streamingOutputSpans = true; + return this; + } + /// /// Logs all request life-cycle events to client spans. /// @@ -137,7 +148,7 @@ public Builder WithFallbackCancellationToken(CancellationToken cancellationToken public ClientTracingInterceptor Build() { - var configuration = new ClientTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _verbose, _tracedAttributes, _waitForReady, _cancellationToken); + var configuration = new ClientTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _streamingOutputSpans, _verbose, _tracedAttributes, _waitForReady, _cancellationToken); return new ClientTracingInterceptor(configuration); } } diff --git a/src/OpenTracing.Contrib.Grpc/Interceptors/ServerTracingInterceptor.cs b/src/OpenTracing.Contrib.Grpc/Interceptors/ServerTracingInterceptor.cs index cbae014..f0d0da2 100644 --- a/src/OpenTracing.Contrib.Grpc/Interceptors/ServerTracingInterceptor.cs +++ b/src/OpenTracing.Contrib.Grpc/Interceptors/ServerTracingInterceptor.cs @@ -54,6 +54,7 @@ public class Builder private IOperationNameConstructor _operationNameConstructor; private bool _streaming; private bool _streamingInputSpans; + private bool _streamingOutputSpans; private bool _verbose; private ISet _tracedAttributes; @@ -90,6 +91,16 @@ public Builder WithStreamingInputSpans() return this; } + /// + /// Creates a child span for each output message sent. + /// + /// this Builder configured to create child spans + public Builder WithStreamingOutputSpans() + { + _streamingOutputSpans = true; + return this; + } + /// /// Logs all request life-cycle events to server spans. /// @@ -110,7 +121,7 @@ public Builder WithTracedAttributes(params ServerTracingConfiguration.RequestAtt public ServerTracingInterceptor Build() { - var configuration = new ServerTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _verbose, _tracedAttributes); + var configuration = new ServerTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _streamingOutputSpans, _verbose, _tracedAttributes); return new ServerTracingInterceptor(configuration); } } diff --git a/src/OpenTracing.Contrib.Grpc/OpenTracing.Contrib.Grpc.csproj b/src/OpenTracing.Contrib.Grpc/OpenTracing.Contrib.Grpc.csproj index 8e25099..fc26366 100644 --- a/src/OpenTracing.Contrib.Grpc/OpenTracing.Contrib.Grpc.csproj +++ b/src/OpenTracing.Contrib.Grpc/OpenTracing.Contrib.Grpc.csproj @@ -1,7 +1,7 @@  - net45;netstandard1.5;netstandard2.0 + net462;netstandard2.1 true Adds OpenTracing instrumentation for .NET Standard apps that use GRPC. opentracing;distributed-tracing;tracing;grpc;netstandard diff --git a/src/OpenTracing.Contrib.Grpc/Streaming/StreamActions.cs b/src/OpenTracing.Contrib.Grpc/Streaming/StreamActions.cs new file mode 100644 index 0000000..ffc502f --- /dev/null +++ b/src/OpenTracing.Contrib.Grpc/Streaming/StreamActions.cs @@ -0,0 +1,35 @@ +using System; + +namespace OpenTracing.Contrib.Grpc.Streaming +{ + internal readonly struct StreamActions + { + public ScopeActions ScopeActions { get; } + public Action OnMessage { get; } + public Action OnStreamEnd { get; } + public Action OnException { get; } + + public StreamActions(ScopeActions scopeActions, Action onMessage, Action onStreamEnd = null, Action onException = null) + { + ScopeActions = scopeActions; + OnMessage = onMessage; + OnStreamEnd = onStreamEnd; + OnException = onException; + } + + public void Message(T msg) + { + OnMessage?.Invoke(msg); + } + + public void Exception(Exception ex) + { + OnException?.Invoke(ex); + } + + public void StreamEnd() + { + OnStreamEnd?.Invoke(); + } + } +} \ No newline at end of file diff --git a/src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.StreamActions.cs b/src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.StreamActions.cs deleted file mode 100644 index 519cfad..0000000 --- a/src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.StreamActions.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System; - -namespace OpenTracing.Contrib.Grpc.Streaming -{ - internal partial class TracingAsyncStreamReader - { - internal readonly struct StreamActions - { - public ScopeActions ScopeActions { get; } - public Action OnMessage { get; } - public Action OnStreamEnd { get; } - public Action OnException { get; } - - public StreamActions(ScopeActions scopeActions, Action onMessage, Action onStreamEnd = null, Action onException = null) - { - ScopeActions = scopeActions; - OnMessage = onMessage; - OnStreamEnd = onStreamEnd; - OnException = onException; - } - - public void Message(T msg) - { - OnMessage?.Invoke(msg); - } - - public void Exception(Exception ex) - { - OnException?.Invoke(ex); - } - - public void StreamEnd() - { - OnStreamEnd?.Invoke(); - } - } - } -} \ No newline at end of file diff --git a/src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.cs b/src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.cs index aba71b9..a387cc1 100644 --- a/src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.cs +++ b/src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.cs @@ -5,14 +5,14 @@ namespace OpenTracing.Contrib.Grpc.Streaming { - internal partial class TracingAsyncStreamReader : IAsyncStreamReader + internal class TracingAsyncStreamReader : IAsyncStreamReader { private readonly IAsyncStreamReader _reader; - private readonly StreamActions _streamActions; + private readonly StreamActions _streamActions; public T Current => _reader.Current; - public TracingAsyncStreamReader(IAsyncStreamReader reader, StreamActions streamActions) + public TracingAsyncStreamReader(IAsyncStreamReader reader, StreamActions streamActions) { _reader = reader; _streamActions = streamActions; diff --git a/src/OpenTracing.Contrib.Grpc/Streaming/TracingClientStreamWriter.cs b/src/OpenTracing.Contrib.Grpc/Streaming/TracingClientStreamWriter.cs index 2939219..0d2e639 100644 --- a/src/OpenTracing.Contrib.Grpc/Streaming/TracingClientStreamWriter.cs +++ b/src/OpenTracing.Contrib.Grpc/Streaming/TracingClientStreamWriter.cs @@ -1,5 +1,4 @@ -using System; -using System.Threading.Tasks; +using System.Threading.Tasks; using Grpc.Core; namespace OpenTracing.Contrib.Grpc.Streaming @@ -7,14 +6,12 @@ namespace OpenTracing.Contrib.Grpc.Streaming internal class TracingClientStreamWriter : IClientStreamWriter { private readonly IClientStreamWriter _writer; - private readonly Action _onWrite; - private readonly Action _onComplete; + private readonly StreamActions _streamActions; - public TracingClientStreamWriter(IClientStreamWriter writer, Action onWrite, Action onComplete = null) + public TracingClientStreamWriter(IClientStreamWriter writer, StreamActions streamActions) { _writer = writer; - _onWrite = onWrite; - _onComplete = onComplete; + _streamActions = streamActions; } public WriteOptions WriteOptions @@ -25,13 +22,18 @@ public WriteOptions WriteOptions public Task WriteAsync(T message) { - _onWrite(message); + _streamActions.ScopeActions.EndScope(); + _streamActions.ScopeActions.BeginScope(); + _streamActions.Message(message); + return _writer.WriteAsync(message); } public Task CompleteAsync() { - _onComplete?.Invoke(); + _streamActions.ScopeActions.EndScope(); + _streamActions.StreamEnd(); + return _writer.CompleteAsync(); } } diff --git a/src/OpenTracing.Contrib.Grpc/Streaming/TracingServerStreamWriter.cs b/src/OpenTracing.Contrib.Grpc/Streaming/TracingServerStreamWriter.cs index 711c0a1..9bbe3d3 100644 --- a/src/OpenTracing.Contrib.Grpc/Streaming/TracingServerStreamWriter.cs +++ b/src/OpenTracing.Contrib.Grpc/Streaming/TracingServerStreamWriter.cs @@ -1,5 +1,4 @@ -using System; -using System.Threading.Tasks; +using System.Threading.Tasks; using Grpc.Core; namespace OpenTracing.Contrib.Grpc.Streaming @@ -7,12 +6,12 @@ namespace OpenTracing.Contrib.Grpc.Streaming internal class TracingServerStreamWriter : IServerStreamWriter { private readonly IServerStreamWriter _writer; - private readonly Action _onWrite; + private readonly StreamActions _streamActions; - public TracingServerStreamWriter(IServerStreamWriter writer, Action onWrite) + public TracingServerStreamWriter(IServerStreamWriter writer, StreamActions streamActions) { _writer = writer; - _onWrite = onWrite; + _streamActions = streamActions; } public WriteOptions WriteOptions @@ -23,7 +22,10 @@ public WriteOptions WriteOptions public Task WriteAsync(T message) { - _onWrite(message); + _streamActions.ScopeActions.EndScope(); + _streamActions.ScopeActions.BeginScope(); + _streamActions.Message(message); + return _writer.WriteAsync(message); } } diff --git a/test/OpenTracing.Contrib.Grpc.Test/OpenTracing.Contrib.Grpc.Test.csproj b/test/OpenTracing.Contrib.Grpc.Test/OpenTracing.Contrib.Grpc.Test.csproj index 1a3a9e2..3f3b738 100644 --- a/test/OpenTracing.Contrib.Grpc.Test/OpenTracing.Contrib.Grpc.Test.csproj +++ b/test/OpenTracing.Contrib.Grpc.Test/OpenTracing.Contrib.Grpc.Test.csproj @@ -2,7 +2,7 @@ Exe - net45;netcoreapp2.1;netcoreapp3.1 + net462;netstandard2.1 OpenTracing.Contrib.Grpc.Test OpenTracing.Contrib.Grpc.Test 7.3 diff --git a/test/OpenTracing.Contrib.Grpc.Test/Program.cs b/test/OpenTracing.Contrib.Grpc.Test/Program.cs index 229af6f..70ee8ad 100644 --- a/test/OpenTracing.Contrib.Grpc.Test/Program.cs +++ b/test/OpenTracing.Contrib.Grpc.Test/Program.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Grpc.Core; @@ -15,20 +14,38 @@ internal class Program { private class ConsoleMockTracer : MockTracer { + private volatile object _syncRoot; + + public ConsoleMockTracer(object syncRoot) + { + _syncRoot = syncRoot; + } + protected override void OnSpanFinished(MockSpan span) { - Console.WriteLine(span); - Console.WriteLine("Tags:"); - Console.WriteLine(string.Join("; ", span.Tags.Select(e => $"{e.Key} = {e.Value}"))); - Console.WriteLine("Logs:"); - span.LogEntries.ForEach(entry => - Console.WriteLine($"Timestamp: {entry.Timestamp}, Fields: " - + string.Join("; ", entry.Fields.Select(e => $"{e.Key} = {e.Value}")))); - Console.WriteLine(); + lock (_syncRoot) + { + Console.WriteLine(span); + Console.WriteLine("Tags:"); + Console.WriteLine(string.Join("; ", span.Tags.Select(e => $"{e.Key} = {e.Value}"))); + Console.WriteLine("Logs:"); + span.LogEntries.ForEach(entry => + Console.WriteLine($"Timestamp: {entry.Timestamp}, Fields: " + + string.Join("; ", entry.Fields.Select(e => $"{e.Key} = {e.Value}")))); + Console.WriteLine(); + } } } - private static readonly ServerTracingInterceptor ServerTracingInterceptor = new ServerTracingInterceptor(new ConsoleMockTracer()); + private static readonly object SyncRoot = new object(); + private static readonly ServerTracingInterceptor ServerTracingInterceptor = new ServerTracingInterceptor + .Builder(new ConsoleMockTracer(SyncRoot)) + .WithStreaming() + .WithStreamingInputSpans() + .WithStreamingOutputSpans() + .WithVerbosity() + .WithTracedAttributes(ServerTracingConfiguration.RequestAttribute.Headers, ServerTracingConfiguration.RequestAttribute.MethodName) + .Build(); private static Task Main() { @@ -45,17 +62,21 @@ private static async Task MainAsync() server.Start(); var tracingInterceptor = new ClientTracingInterceptor - .Builder(new ConsoleMockTracer()) + .Builder(new ConsoleMockTracer(SyncRoot)) .WithStreaming() + .WithStreamingInputSpans() + .WithStreamingOutputSpans() .WithVerbosity() .WithTracedAttributes(ClientTracingConfiguration.RequestAttribute.AllCallOptions, ClientTracingConfiguration.RequestAttribute.Headers) .WithWaitForReady() .Build(); + Console.WriteLine("Calling unary:"); var client = new Phone.PhoneClient(new Channel("localhost:8011", ChannelCredentials.Insecure).Intercept(tracingInterceptor)); var request = new Person { Name = "Karl Heinz" }; var _ = await client.GetNameAsync(request); + Console.WriteLine("Calling client stream:"); var response2 = client.GetNameRequestStream(); await response2.RequestStream.WriteAsync(request); await response2.RequestStream.WriteAsync(request); @@ -63,12 +84,14 @@ private static async Task MainAsync() await response2.RequestStream.CompleteAsync(); await response2.ResponseAsync; + Console.WriteLine("Calling server stream:"); var response3 = client.GetNameResponseStream(request); while (await response3.ResponseStream.MoveNext()) { // Ignore } + Console.WriteLine("Calling bi-di stream:"); var response4 = client.GetNameBiDiStream(new Metadata()); await response4.RequestStream.WriteAsync(request); await response4.RequestStream.WriteAsync(request); @@ -88,6 +111,7 @@ private static async Task MainAsync() .WithWaitForReady(true) .WithWriteOptions(new WriteOptions(WriteFlags.NoCompress)); + Console.WriteLine("Calling unary with options:"); var response5 = await client.GetNameAsync( new Person { Name = "Test" }, options);