From 7ca245bc94a318d5f021988049eb65f3c9d5e0fc Mon Sep 17 00:00:00 2001 From: Alex Hemsath <57361211+nr-ahemsath@users.noreply.github.com> Date: Mon, 25 Jan 2021 17:29:04 -0800 Subject: [PATCH] 8t more failed precondition handling (#433) * Attempt to handle FAILED_PRECONDITION in response stream(s) * Add support for specifying `flaky_code` in 8T connection metadata * Explicitly catch Grpc.RpcException when handling server responses * Improved logging of internal/stream reset response from trace observer * Add unit test for INTERNAL status code response when sending spans * Flip logging priority of expected vs. unexpected gRPC response exceptions * Tweak log messages and log levels * Fix typo --- .../Configuration/DefaultConfiguration.cs | 23 ++++ .../DataTransport/DataStreamingService.cs | 105 +++++++++++------- .../DataTransport/SpanStreamingService.cs | 3 +- .../Configuration/IConfiguration.cs | 1 + .../Spans/SpanStreamingServiceTests.cs | 11 +- 5 files changed, 95 insertions(+), 48 deletions(-) diff --git a/src/Agent/NewRelic/Agent/Core/Configuration/DefaultConfiguration.cs b/src/Agent/NewRelic/Agent/Core/Configuration/DefaultConfiguration.cs index b686e211a1..183641d659 100644 --- a/src/Agent/NewRelic/Agent/Core/Configuration/DefaultConfiguration.cs +++ b/src/Agent/NewRelic/Agent/Core/Configuration/DefaultConfiguration.cs @@ -1051,6 +1051,15 @@ private void GetInfiniteTracingFlakyAndDelayTestSettings() _infiniteTracingObserverTestFlaky = TryGetAppSettingAsFloat("InfiniteTracingSpanEventsTestFlaky"); } + if (int.TryParse(_environment.GetEnvironmentVariable("NEW_RELIC_INFINITE_TRACING_SPAN_EVENTS_TEST_FLAKY_CODE"), out var flakyCodeVal)) + { + _infiniteTracingObserverTestFlakyCode = flakyCodeVal; + } + else + { + _infiniteTracingObserverTestFlakyCode = TryGetAppSettingAsInt("InfiniteTracingSpanEventsTestFlakyCode"); + } + if (int.TryParse(_environment.GetEnvironmentVariable("NEW_RELIC_INFINITE_TRACING_SPAN_EVENTS_TEST_DELAY"), out var delayVal)) { _infiniteTracingObserverTestDelayMs = delayVal; @@ -1077,6 +1086,20 @@ public float? InfiniteTracingTraceObserverTestFlaky } } + private int? _infiniteTracingObserverTestFlakyCode; + public int? InfiniteTracingTraceObserverTestFlakyCode + { + get + { + if (!_infiniteTracingObtainedSettingsForTest) + { + GetInfiniteTracingFlakyAndDelayTestSettings(); + } + + return _infiniteTracingObserverTestFlakyCode; + } + } + private int? _infiniteTracingObserverTestDelayMs; public int? InfiniteTracingTraceObserverTestDelayMs { diff --git a/src/Agent/NewRelic/Agent/Core/DataTransport/DataStreamingService.cs b/src/Agent/NewRelic/Agent/Core/DataTransport/DataStreamingService.cs index 1d25ec97f5..9aedd268f1 100644 --- a/src/Agent/NewRelic/Agent/Core/DataTransport/DataStreamingService.cs +++ b/src/Agent/NewRelic/Agent/Core/DataTransport/DataStreamingService.cs @@ -25,6 +25,8 @@ public class ResponseStreamWrapper private bool _isInvalid = false; public bool IsInvalid => _streamCancellationToken.IsCancellationRequested || _isInvalid || (_task?.IsFaulted).GetValueOrDefault(false); + public RpcException ResponseRpcException = null; + private readonly IAsyncStreamReader _responseStream; private readonly CancellationToken _streamCancellationToken; @@ -40,30 +42,29 @@ public ResponseStreamWrapper(int consumerID, IAsyncStreamReader respo private async Task WaitForResponse() { - bool success; + var success = false; try { success = await _responseStream.MoveNext(_streamCancellationToken); } - catch (Exception ex) + catch (RpcException rpcEx) { - success = false; - var logLevel = LogLevel.Finest; - var aggEx = ex as AggregateException; - if (aggEx != null && aggEx.InnerException != null) - { - var rpcEx = aggEx.InnerException as RpcException; + ResponseRpcException = rpcEx; - logLevel = (rpcEx != null && rpcEx.StatusCode == StatusCode.Cancelled) - ? LogLevel.Finest - : LogLevel.Debug; + if (Log.IsEnabledFor(logLevel)) + { + Log.LogMessage(logLevel, $"GRPC RpcException encountered while handling gRPC server responses: {rpcEx.Status}"); } + } + catch (Exception ex) + { + var logLevel = LogLevel.Debug; if (Log.IsEnabledFor(logLevel)) { - Log.LogMessage(logLevel, $"Exception encountered while handling gRPC server responses: {ex}"); + Log.LogMessage(logLevel, $"Unknown exception encountered while handling gRPC server responses: {ex}"); } } @@ -111,6 +112,7 @@ public abstract class DataStreamingService : private const string UnimplementedStatus = "UNIMPLEMENTED"; private const string UnavailableStatus = "UNAVAILABLE"; private const string FailedPreconditionStatus = "FAILED_PRECONDITION"; + private const string InternalStatus = "INTERNAL"; private const string OkStatus = "OK"; private readonly IGrpcWrapper _grpcWrapper; private readonly IDelayer _delayer; @@ -178,6 +180,11 @@ private Metadata CreateMetadataHeaders() headers.Add(new Metadata.Entry("flaky", EndpointTestFlaky.ToString())); } + if (EndpointTestFlakyCode.HasValue) + { + headers.Add(new Metadata.Entry("flaky_code", EndpointTestFlakyCode.ToString())); + } + if (Log.IsFinestEnabled) { var parametersString = string.Join(",", headers.Select(x => $"{x.Key}={x.Value}")); @@ -195,6 +202,7 @@ private Metadata CreateMetadataHeaders() public int EndpointPort { get; private set; } public bool EndpointSsl { get; private set; } public float? EndpointTestFlaky { get; private set; } + public int? EndpointTestFlakyCode { get; private set; } public int? EndpointTestDelayMs { get; private set; } public abstract int BatchSizeConfigValue { get; } @@ -202,6 +210,7 @@ private Metadata CreateMetadataHeaders() protected abstract string EndpointPortConfigValue { get; } protected abstract string EndpointSslConfigValue { get; } protected abstract float? EndpointTestFlakyConfigValue { get; } + protected abstract int? EndpointTestFlakyCodeConfigValue { get; } protected abstract int? EndpointTestDelayMsConfigValue { get; } @@ -266,6 +275,7 @@ public bool ReadAndValidateConfiguration() EndpointPort = -1; EndpointSsl = true; EndpointTestFlaky = null; + EndpointTestFlakyCode = null; EndpointTestDelayMs = null; return false; @@ -275,17 +285,19 @@ public bool ReadAndValidateConfiguration() var isValidPort = int.TryParse(configPortStr, out var configPort) && configPort > 0 && configPort <= 65535; var isValidSsl = bool.TryParse(configSslStr, out var configSsl); var isValidFlaky = EndpointTestFlakyConfigValue == null || (EndpointTestFlakyConfigValue >= 0 && EndpointTestFlakyConfigValue <= 100); + var isValidFlakyCode = EndpointTestFlakyCodeConfigValue == null || (EndpointTestFlakyCodeConfigValue >= 0 && EndpointTestFlakyCodeConfigValue <= 16); // See https://github.com/grpc/grpc/blob/master/doc/statuscodes.md var isValidDelay = EndpointTestDelayMsConfigValue == null || (EndpointTestDelayMsConfigValue >= 0); var isValidTimeoutConnect = TimeoutConnectMs > 0; var isValidTimeoutSend = TimeoutSendDataMs > 0; var isValidBatchSize = BatchSizeConfigValue > 0; - if (isValidHost && isValidPort && isValidSsl && isValidFlaky && isValidDelay && isValidTimeoutConnect && isValidTimeoutSend && isValidBatchSize) + if (isValidHost && isValidPort && isValidSsl && isValidFlaky && isValidFlakyCode && isValidDelay && isValidTimeoutConnect && isValidTimeoutSend && isValidBatchSize) { EndpointHost = configHost; EndpointPort = configPort; EndpointSsl = configSsl; EndpointTestFlaky = EndpointTestFlakyConfigValue; + EndpointTestFlakyCode = EndpointTestFlakyCodeConfigValue; EndpointTestDelayMs = EndpointTestDelayMsConfigValue; return true; @@ -317,6 +329,11 @@ public bool ReadAndValidateConfiguration() LogMessage(LogLevel.Info, $"Invalid Configuration For Test. Flaky % '{EndpointTestFlakyConfigValue}' is not valid. Infinite Tracing will NOT be started."); } + if (!isValidFlakyCode) + { + LogMessage(LogLevel.Info, $"Invalid Configuration For Test. Flaky response code '{EndpointTestFlakyCodeConfigValue}' is not valid. Infinite Tracing will NOT be started."); + } + if (!isValidDelay) { LogMessage(LogLevel.Info, $"Invalid Configuration For Test. Delay Ms '{EndpointTestDelayMsConfigValue}' is not valid. Infinite Tracing will NOT be started."); @@ -370,6 +387,11 @@ private void ManageResponseStreams(CancellationToken serviceCancellationToken) { LogMessage(LogLevel.Finest, x.ConsumerID, "Response Stream Manager - Removing Stream"); _responseStreamsDic.TryRemove(x.ConsumerID, out _); + if ((x.ResponseRpcException != null) && (x.ResponseRpcException.StatusCode == StatusCode.FailedPrecondition)) + { + LogMessage(LogLevel.Debug, $"The gRPC endpoint defined at {EndpointHost}:{EndpointPort} returned {FailedPreconditionStatus}, indicating the traffic is being redirected to a new host. Restarting service."); + Shutdown(true); + } } var tasksToWaitFor = _responseStreamsDic.Values @@ -478,6 +500,7 @@ private void LogConfigurationSettings() LogMessage(LogLevel.Finest, $"Configuration Setting - SSL - {EndpointSsl}"); LogMessage(LogLevel.Info, $"Configuration Setting - Consumers - {_configuration.InfiniteTracingTraceCountConsumers}"); LogMessage(LogLevel.Finest, $"Configuration Setting - Test Flaky - {EndpointTestFlaky?.ToString() ?? "NULL"}"); + LogMessage(LogLevel.Finest, $"Configuration Setting - Test Flaky Code - {EndpointTestFlakyCode?.ToString() ?? "NULL"}"); LogMessage(LogLevel.Finest, $"Configuration Setting - Test Delay (ms) - {EndpointTestDelayMs?.ToString() ?? "NULL"}"); } @@ -592,7 +615,7 @@ public void Dispose() public void Shutdown(bool withRestart) { - LogMessage(LogLevel.Debug, "Shutdown Request Received"); + LogMessage(LogLevel.Debug, $"Shutdown Request Received, restart = {withRestart}"); _shouldRestart = withRestart; _cancellationTokenSource.Cancel(); @@ -648,14 +671,14 @@ private bool GetRequestStreamWithRetry(int consumerId, CancellationToken cancell if (grpcWrapperEx.Status == UnavailableStatus) { - LogMessage(LogLevel.Error, consumerId, $"The gRPC request stream could not be created because the gRPC endpoint defined at {EndpointHost}:{EndpointPort} is no longer available so we will restart this service."); + LogMessage(LogLevel.Error, consumerId, $"The gRPC request stream could not be created because the gRPC endpoint defined at {EndpointHost}:{EndpointPort} is temporarily unavailable, so we will restart this service."); Shutdown(true); return false; } if (grpcWrapperEx.Status == FailedPreconditionStatus) { - LogMessage(LogLevel.Error, consumerId, $"The gRPC request stream could not be created because the gRPC endpoint defined at {EndpointHost}:{EndpointPort} has been moved to a different host so we will restart this service."); + LogMessage(LogLevel.Error, consumerId, $"The gRPC request stream could not be created because the gRPC endpoint defined at {EndpointHost}:{EndpointPort} has been moved to a different host, so we will restart this service."); Shutdown(true); return false; } @@ -865,40 +888,38 @@ private TrySendStatus TrySend(int consumerId, IClientStreamWriter } catch (GrpcWrapperException grpcEx) when (!string.IsNullOrWhiteSpace(grpcEx.Status)) { - LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - gRPC Exception: {grpcEx.Status}", grpcEx); - RecordResponseError(); RecordGrpcError(grpcEx.Status); - if (grpcEx.Status == UnimplementedStatus) - { - Shutdown(false); - return TrySendStatus.Error; - } - - if (grpcEx.Status == UnavailableStatus) + switch (grpcEx.Status) { - LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - Channel not available, requesting restart"); - Shutdown(true); - return TrySendStatus.Error; - } - - if (grpcEx.Status == FailedPreconditionStatus) - { - LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - Channel has been moved, requesting restart"); - Shutdown(true); - return TrySendStatus.Error; - } - - if (grpcEx.Status == OkStatus) - { - LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - A stream was closed due to connection rebalance. New stream requested and data will be resent immediately."); - return TrySendStatus.ErrorWithImmediateRetry; + case UnimplementedStatus: + LogMessage(LogLevel.Error, consumerId, $"Attempting to send {items.Count} item(s) - Trace observer is no longer available, shutting down infinite tracing service."); + Shutdown(false); + return TrySendStatus.Error; + case UnavailableStatus: + LogMessage(LogLevel.Info, consumerId, $"Attempting to send {items.Count} item(s) - Channel not available, requesting restart"); + Shutdown(true); + return TrySendStatus.Error; + case FailedPreconditionStatus: + LogMessage(LogLevel.Debug, consumerId, $"Attempting to send {items.Count} item(s) - Channel has been moved, requesting restart"); + Shutdown(true); + return TrySendStatus.Error; + case InternalStatus: + LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - A stream was reset due to inactivity. New stream requested and data will be resent immediately."); + return TrySendStatus.ErrorWithImmediateRetry; + case OkStatus: + LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - A stream was closed due to connection rebalance. New stream requested and data will be resent immediately."); + return TrySendStatus.ErrorWithImmediateRetry; + default: + var rpcEx = grpcEx.InnerException as RpcException; + LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - gRPC Exception: {rpcEx?.Status.ToString() ?? grpcEx.Status}"); + break; } } catch (Exception ex) { - LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s)", ex); + LogMessage(LogLevel.Debug, consumerId, $"Unknown exception attempting to send {items.Count} item(s)", ex); RecordResponseError(); } diff --git a/src/Agent/NewRelic/Agent/Core/DataTransport/SpanStreamingService.cs b/src/Agent/NewRelic/Agent/Core/DataTransport/SpanStreamingService.cs index 1e3e4fb243..ad7ea39a26 100644 --- a/src/Agent/NewRelic/Agent/Core/DataTransport/SpanStreamingService.cs +++ b/src/Agent/NewRelic/Agent/Core/DataTransport/SpanStreamingService.cs @@ -21,6 +21,7 @@ public SpanStreamingService(IGrpcWrapper grpcWrapper, I protected override string EndpointPortConfigValue => _configuration?.InfiniteTracingTraceObserverPort; protected override string EndpointSslConfigValue => _configuration?.InfiniteTracingTraceObserverSsl; protected override float? EndpointTestFlakyConfigValue => _configuration?.InfiniteTracingTraceObserverTestFlaky; + protected override int? EndpointTestFlakyCodeConfigValue => _configuration?.InfiniteTracingTraceObserverTestFlakyCode; protected override int? EndpointTestDelayMsConfigValue => _configuration?.InfiniteTracingTraceObserverTestDelayMs; public override int BatchSizeConfigValue => (_configuration?.InfiniteTracingBatchSizeSpans).GetValueOrDefault(0); @@ -29,7 +30,7 @@ protected override void HandleServerResponse(RecordStatus responseModel, int con LogMessage(LogLevel.Finest, consumerId, $"Received gRPC Server response: {responseModel.MessagesSeen}"); RecordReceived(responseModel.MessagesSeen); - + } private void RecordReceived(ulong countItems) diff --git a/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Configuration/IConfiguration.cs b/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Configuration/IConfiguration.cs index f44b54b015..46ba85f699 100644 --- a/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Configuration/IConfiguration.cs +++ b/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Configuration/IConfiguration.cs @@ -71,6 +71,7 @@ public interface IConfiguration string InfiniteTracingTraceObserverPort { get; } string InfiniteTracingTraceObserverSsl { get; } float? InfiniteTracingTraceObserverTestFlaky { get; } + int? InfiniteTracingTraceObserverTestFlakyCode { get; } int? InfiniteTracingTraceObserverTestDelayMs { get; } int InfiniteTracingQueueSizeSpans { get; } int InfiniteTracingPartitionCountSpans { get; } diff --git a/tests/Agent/UnitTests/Core.UnitTest/Spans/SpanStreamingServiceTests.cs b/tests/Agent/UnitTests/Core.UnitTest/Spans/SpanStreamingServiceTests.cs index b1e00193c1..e40519935e 100644 --- a/tests/Agent/UnitTests/Core.UnitTest/Spans/SpanStreamingServiceTests.cs +++ b/tests/Agent/UnitTests/Core.UnitTest/Spans/SpanStreamingServiceTests.cs @@ -1552,8 +1552,9 @@ public void GrpcUnavailableOrFailedPreconditionDuringCreateStreamRestartsService ); } - [Test] - public void GrpcOkDuringTrySendDataCreatesNewStreamImmediately() + [TestCase(StatusCode.OK)] + [TestCase(StatusCode.Internal)] + public void GrpcOkOrInternalDuringTrySendDataCreatesNewStreamImmediately(StatusCode statusCode) { var actualCountGrpcErrors = 0; var actualCountGeneralErrors = 0; @@ -1600,13 +1601,13 @@ public void GrpcOkDuringTrySendDataCreatesNewStreamImmediately() if (localInvocationId < 2 || localInvocationId == 3) { - MockGrpcWrapper.ThrowGrpcWrapperException(StatusCode.Internal, "Test gRPC Exception"); + MockGrpcWrapper.ThrowGrpcWrapperException(StatusCode.Unknown, "Test gRPC Exception"); return false; } if (localInvocationId == 2) { - MockGrpcWrapper.ThrowGrpcWrapperException(StatusCode.OK, "Test gRPC Exception"); + MockGrpcWrapper.ThrowGrpcWrapperException(statusCode, "Test gRPC Exception"); return false; } @@ -1726,7 +1727,7 @@ public void GrpcOkDuringCreateStreamRetriesImmediately() if (countCreateStreamsCalls < 4 || countCreateStreamsCalls == 5) { - MockGrpcWrapper.ThrowGrpcWrapperException(StatusCode.Internal, "Test gRPC Exception"); + MockGrpcWrapper.ThrowGrpcWrapperException(StatusCode.Unknown, "Test gRPC Exception"); return null; }