Skip to content

Commit

Permalink
8t more failed precondition handling (#433)
Browse files Browse the repository at this point in the history
* Attempt to handle FAILED_PRECONDITION in response stream(s)

* Add support for specifying `flaky_code` in 8T connection metadata

* Explicitly catch Grpc.RpcException when handling server responses

* Improved logging of internal/stream reset response from trace observer

* Add unit test for INTERNAL status code response when sending spans

* Flip logging priority of expected vs. unexpected gRPC response exceptions

* Tweak log messages and log levels

* Fix typo
  • Loading branch information
nr-ahemsath authored Jan 26, 2021
1 parent b149609 commit 7ca245b
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,15 @@ private void GetInfiniteTracingFlakyAndDelayTestSettings()
_infiniteTracingObserverTestFlaky = TryGetAppSettingAsFloat("InfiniteTracingSpanEventsTestFlaky");
}

if (int.TryParse(_environment.GetEnvironmentVariable("NEW_RELIC_INFINITE_TRACING_SPAN_EVENTS_TEST_FLAKY_CODE"), out var flakyCodeVal))
{
_infiniteTracingObserverTestFlakyCode = flakyCodeVal;
}
else
{
_infiniteTracingObserverTestFlakyCode = TryGetAppSettingAsInt("InfiniteTracingSpanEventsTestFlakyCode");
}

if (int.TryParse(_environment.GetEnvironmentVariable("NEW_RELIC_INFINITE_TRACING_SPAN_EVENTS_TEST_DELAY"), out var delayVal))
{
_infiniteTracingObserverTestDelayMs = delayVal;
Expand All @@ -1077,6 +1086,20 @@ public float? InfiniteTracingTraceObserverTestFlaky
}
}

private int? _infiniteTracingObserverTestFlakyCode;
public int? InfiniteTracingTraceObserverTestFlakyCode
{
get
{
if (!_infiniteTracingObtainedSettingsForTest)
{
GetInfiniteTracingFlakyAndDelayTestSettings();
}

return _infiniteTracingObserverTestFlakyCode;
}
}

private int? _infiniteTracingObserverTestDelayMs;
public int? InfiniteTracingTraceObserverTestDelayMs
{
Expand Down
105 changes: 63 additions & 42 deletions src/Agent/NewRelic/Agent/Core/DataTransport/DataStreamingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class ResponseStreamWrapper<TResponse>
private bool _isInvalid = false;
public bool IsInvalid => _streamCancellationToken.IsCancellationRequested || _isInvalid || (_task?.IsFaulted).GetValueOrDefault(false);

public RpcException ResponseRpcException = null;


private readonly IAsyncStreamReader<TResponse> _responseStream;
private readonly CancellationToken _streamCancellationToken;
Expand All @@ -40,30 +42,29 @@ public ResponseStreamWrapper(int consumerID, IAsyncStreamReader<TResponse> respo

private async Task<int> WaitForResponse()
{
bool success;
var success = false;
try
{
success = await _responseStream.MoveNext(_streamCancellationToken);
}
catch (Exception ex)
catch (RpcException rpcEx)
{
success = false;

var logLevel = LogLevel.Finest;

var aggEx = ex as AggregateException;
if (aggEx != null && aggEx.InnerException != null)
{
var rpcEx = aggEx.InnerException as RpcException;
ResponseRpcException = rpcEx;

logLevel = (rpcEx != null && rpcEx.StatusCode == StatusCode.Cancelled)
? LogLevel.Finest
: LogLevel.Debug;
if (Log.IsEnabledFor(logLevel))
{
Log.LogMessage(logLevel, $"GRPC RpcException encountered while handling gRPC server responses: {rpcEx.Status}");
}
}
catch (Exception ex)
{
var logLevel = LogLevel.Debug;

if (Log.IsEnabledFor(logLevel))
{
Log.LogMessage(logLevel, $"Exception encountered while handling gRPC server responses: {ex}");
Log.LogMessage(logLevel, $"Unknown exception encountered while handling gRPC server responses: {ex}");
}
}

Expand Down Expand Up @@ -111,6 +112,7 @@ public abstract class DataStreamingService<TRequest, TRequestBatch, TResponse> :
private const string UnimplementedStatus = "UNIMPLEMENTED";
private const string UnavailableStatus = "UNAVAILABLE";
private const string FailedPreconditionStatus = "FAILED_PRECONDITION";
private const string InternalStatus = "INTERNAL";
private const string OkStatus = "OK";
private readonly IGrpcWrapper<TRequestBatch, TResponse> _grpcWrapper;
private readonly IDelayer _delayer;
Expand Down Expand Up @@ -178,6 +180,11 @@ private Metadata CreateMetadataHeaders()
headers.Add(new Metadata.Entry("flaky", EndpointTestFlaky.ToString()));
}

if (EndpointTestFlakyCode.HasValue)
{
headers.Add(new Metadata.Entry("flaky_code", EndpointTestFlakyCode.ToString()));
}

if (Log.IsFinestEnabled)
{
var parametersString = string.Join(",", headers.Select(x => $"{x.Key}={x.Value}"));
Expand All @@ -195,13 +202,15 @@ private Metadata CreateMetadataHeaders()
public int EndpointPort { get; private set; }
public bool EndpointSsl { get; private set; }
public float? EndpointTestFlaky { get; private set; }
public int? EndpointTestFlakyCode { get; private set; }
public int? EndpointTestDelayMs { get; private set; }
public abstract int BatchSizeConfigValue { get; }

protected abstract string EndpointHostConfigValue { get; }
protected abstract string EndpointPortConfigValue { get; }
protected abstract string EndpointSslConfigValue { get; }
protected abstract float? EndpointTestFlakyConfigValue { get; }
protected abstract int? EndpointTestFlakyCodeConfigValue { get; }
protected abstract int? EndpointTestDelayMsConfigValue { get; }


Expand Down Expand Up @@ -266,6 +275,7 @@ public bool ReadAndValidateConfiguration()
EndpointPort = -1;
EndpointSsl = true;
EndpointTestFlaky = null;
EndpointTestFlakyCode = null;
EndpointTestDelayMs = null;

return false;
Expand All @@ -275,17 +285,19 @@ public bool ReadAndValidateConfiguration()
var isValidPort = int.TryParse(configPortStr, out var configPort) && configPort > 0 && configPort <= 65535;
var isValidSsl = bool.TryParse(configSslStr, out var configSsl);
var isValidFlaky = EndpointTestFlakyConfigValue == null || (EndpointTestFlakyConfigValue >= 0 && EndpointTestFlakyConfigValue <= 100);
var isValidFlakyCode = EndpointTestFlakyCodeConfigValue == null || (EndpointTestFlakyCodeConfigValue >= 0 && EndpointTestFlakyCodeConfigValue <= 16); // See https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
var isValidDelay = EndpointTestDelayMsConfigValue == null || (EndpointTestDelayMsConfigValue >= 0);
var isValidTimeoutConnect = TimeoutConnectMs > 0;
var isValidTimeoutSend = TimeoutSendDataMs > 0;
var isValidBatchSize = BatchSizeConfigValue > 0;

if (isValidHost && isValidPort && isValidSsl && isValidFlaky && isValidDelay && isValidTimeoutConnect && isValidTimeoutSend && isValidBatchSize)
if (isValidHost && isValidPort && isValidSsl && isValidFlaky && isValidFlakyCode && isValidDelay && isValidTimeoutConnect && isValidTimeoutSend && isValidBatchSize)
{
EndpointHost = configHost;
EndpointPort = configPort;
EndpointSsl = configSsl;
EndpointTestFlaky = EndpointTestFlakyConfigValue;
EndpointTestFlakyCode = EndpointTestFlakyCodeConfigValue;
EndpointTestDelayMs = EndpointTestDelayMsConfigValue;

return true;
Expand Down Expand Up @@ -317,6 +329,11 @@ public bool ReadAndValidateConfiguration()
LogMessage(LogLevel.Info, $"Invalid Configuration For Test. Flaky % '{EndpointTestFlakyConfigValue}' is not valid. Infinite Tracing will NOT be started.");
}

if (!isValidFlakyCode)
{
LogMessage(LogLevel.Info, $"Invalid Configuration For Test. Flaky response code '{EndpointTestFlakyCodeConfigValue}' is not valid. Infinite Tracing will NOT be started.");
}

if (!isValidDelay)
{
LogMessage(LogLevel.Info, $"Invalid Configuration For Test. Delay Ms '{EndpointTestDelayMsConfigValue}' is not valid. Infinite Tracing will NOT be started.");
Expand Down Expand Up @@ -370,6 +387,11 @@ private void ManageResponseStreams(CancellationToken serviceCancellationToken)
{
LogMessage(LogLevel.Finest, x.ConsumerID, "Response Stream Manager - Removing Stream");
_responseStreamsDic.TryRemove(x.ConsumerID, out _);
if ((x.ResponseRpcException != null) && (x.ResponseRpcException.StatusCode == StatusCode.FailedPrecondition))
{
LogMessage(LogLevel.Debug, $"The gRPC endpoint defined at {EndpointHost}:{EndpointPort} returned {FailedPreconditionStatus}, indicating the traffic is being redirected to a new host. Restarting service.");
Shutdown(true);
}
}

var tasksToWaitFor = _responseStreamsDic.Values
Expand Down Expand Up @@ -478,6 +500,7 @@ private void LogConfigurationSettings()
LogMessage(LogLevel.Finest, $"Configuration Setting - SSL - {EndpointSsl}");
LogMessage(LogLevel.Info, $"Configuration Setting - Consumers - {_configuration.InfiniteTracingTraceCountConsumers}");
LogMessage(LogLevel.Finest, $"Configuration Setting - Test Flaky - {EndpointTestFlaky?.ToString() ?? "NULL"}");
LogMessage(LogLevel.Finest, $"Configuration Setting - Test Flaky Code - {EndpointTestFlakyCode?.ToString() ?? "NULL"}");
LogMessage(LogLevel.Finest, $"Configuration Setting - Test Delay (ms) - {EndpointTestDelayMs?.ToString() ?? "NULL"}");
}

Expand Down Expand Up @@ -592,7 +615,7 @@ public void Dispose()

public void Shutdown(bool withRestart)
{
LogMessage(LogLevel.Debug, "Shutdown Request Received");
LogMessage(LogLevel.Debug, $"Shutdown Request Received, restart = {withRestart}");

_shouldRestart = withRestart;
_cancellationTokenSource.Cancel();
Expand Down Expand Up @@ -648,14 +671,14 @@ private bool GetRequestStreamWithRetry(int consumerId, CancellationToken cancell

if (grpcWrapperEx.Status == UnavailableStatus)
{
LogMessage(LogLevel.Error, consumerId, $"The gRPC request stream could not be created because the gRPC endpoint defined at {EndpointHost}:{EndpointPort} is no longer available so we will restart this service.");
LogMessage(LogLevel.Error, consumerId, $"The gRPC request stream could not be created because the gRPC endpoint defined at {EndpointHost}:{EndpointPort} is temporarily unavailable, so we will restart this service.");
Shutdown(true);
return false;
}

if (grpcWrapperEx.Status == FailedPreconditionStatus)
{
LogMessage(LogLevel.Error, consumerId, $"The gRPC request stream could not be created because the gRPC endpoint defined at {EndpointHost}:{EndpointPort} has been moved to a different host so we will restart this service.");
LogMessage(LogLevel.Error, consumerId, $"The gRPC request stream could not be created because the gRPC endpoint defined at {EndpointHost}:{EndpointPort} has been moved to a different host, so we will restart this service.");
Shutdown(true);
return false;
}
Expand Down Expand Up @@ -865,40 +888,38 @@ private TrySendStatus TrySend(int consumerId, IClientStreamWriter<TRequestBatch>
}
catch (GrpcWrapperException grpcEx) when (!string.IsNullOrWhiteSpace(grpcEx.Status))
{
LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - gRPC Exception: {grpcEx.Status}", grpcEx);

RecordResponseError();
RecordGrpcError(grpcEx.Status);

if (grpcEx.Status == UnimplementedStatus)
{
Shutdown(false);
return TrySendStatus.Error;
}

if (grpcEx.Status == UnavailableStatus)
switch (grpcEx.Status)
{
LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - Channel not available, requesting restart");
Shutdown(true);
return TrySendStatus.Error;
}

if (grpcEx.Status == FailedPreconditionStatus)
{
LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - Channel has been moved, requesting restart");
Shutdown(true);
return TrySendStatus.Error;
}

if (grpcEx.Status == OkStatus)
{
LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - A stream was closed due to connection rebalance. New stream requested and data will be resent immediately.");
return TrySendStatus.ErrorWithImmediateRetry;
case UnimplementedStatus:
LogMessage(LogLevel.Error, consumerId, $"Attempting to send {items.Count} item(s) - Trace observer is no longer available, shutting down infinite tracing service.");
Shutdown(false);
return TrySendStatus.Error;
case UnavailableStatus:
LogMessage(LogLevel.Info, consumerId, $"Attempting to send {items.Count} item(s) - Channel not available, requesting restart");
Shutdown(true);
return TrySendStatus.Error;
case FailedPreconditionStatus:
LogMessage(LogLevel.Debug, consumerId, $"Attempting to send {items.Count} item(s) - Channel has been moved, requesting restart");
Shutdown(true);
return TrySendStatus.Error;
case InternalStatus:
LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - A stream was reset due to inactivity. New stream requested and data will be resent immediately.");
return TrySendStatus.ErrorWithImmediateRetry;
case OkStatus:
LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - A stream was closed due to connection rebalance. New stream requested and data will be resent immediately.");
return TrySendStatus.ErrorWithImmediateRetry;
default:
var rpcEx = grpcEx.InnerException as RpcException;
LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s) - gRPC Exception: {rpcEx?.Status.ToString() ?? grpcEx.Status}");
break;
}
}
catch (Exception ex)
{
LogMessage(LogLevel.Finest, consumerId, $"Attempting to send {items.Count} item(s)", ex);
LogMessage(LogLevel.Debug, consumerId, $"Unknown exception attempting to send {items.Count} item(s)", ex);
RecordResponseError();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public SpanStreamingService(IGrpcWrapper<SpanBatch, RecordStatus> grpcWrapper, I
protected override string EndpointPortConfigValue => _configuration?.InfiniteTracingTraceObserverPort;
protected override string EndpointSslConfigValue => _configuration?.InfiniteTracingTraceObserverSsl;
protected override float? EndpointTestFlakyConfigValue => _configuration?.InfiniteTracingTraceObserverTestFlaky;
protected override int? EndpointTestFlakyCodeConfigValue => _configuration?.InfiniteTracingTraceObserverTestFlakyCode;
protected override int? EndpointTestDelayMsConfigValue => _configuration?.InfiniteTracingTraceObserverTestDelayMs;
public override int BatchSizeConfigValue => (_configuration?.InfiniteTracingBatchSizeSpans).GetValueOrDefault(0);

Expand All @@ -29,7 +30,7 @@ protected override void HandleServerResponse(RecordStatus responseModel, int con
LogMessage(LogLevel.Finest, consumerId, $"Received gRPC Server response: {responseModel.MessagesSeen}");

RecordReceived(responseModel.MessagesSeen);

}

private void RecordReceived(ulong countItems)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public interface IConfiguration
string InfiniteTracingTraceObserverPort { get; }
string InfiniteTracingTraceObserverSsl { get; }
float? InfiniteTracingTraceObserverTestFlaky { get; }
int? InfiniteTracingTraceObserverTestFlakyCode { get; }
int? InfiniteTracingTraceObserverTestDelayMs { get; }
int InfiniteTracingQueueSizeSpans { get; }
int InfiniteTracingPartitionCountSpans { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1552,8 +1552,9 @@ public void GrpcUnavailableOrFailedPreconditionDuringCreateStreamRestartsService
);
}

[Test]
public void GrpcOkDuringTrySendDataCreatesNewStreamImmediately()
[TestCase(StatusCode.OK)]
[TestCase(StatusCode.Internal)]
public void GrpcOkOrInternalDuringTrySendDataCreatesNewStreamImmediately(StatusCode statusCode)
{
var actualCountGrpcErrors = 0;
var actualCountGeneralErrors = 0;
Expand Down Expand Up @@ -1600,13 +1601,13 @@ public void GrpcOkDuringTrySendDataCreatesNewStreamImmediately()

if (localInvocationId < 2 || localInvocationId == 3)
{
MockGrpcWrapper<TRequest, TResponse>.ThrowGrpcWrapperException(StatusCode.Internal, "Test gRPC Exception");
MockGrpcWrapper<TRequest, TResponse>.ThrowGrpcWrapperException(StatusCode.Unknown, "Test gRPC Exception");
return false;
}

if (localInvocationId == 2)
{
MockGrpcWrapper<TRequest, TResponse>.ThrowGrpcWrapperException(StatusCode.OK, "Test gRPC Exception");
MockGrpcWrapper<TRequest, TResponse>.ThrowGrpcWrapperException(statusCode, "Test gRPC Exception");
return false;
}

Expand Down Expand Up @@ -1726,7 +1727,7 @@ public void GrpcOkDuringCreateStreamRetriesImmediately()

if (countCreateStreamsCalls < 4 || countCreateStreamsCalls == 5)
{
MockGrpcWrapper<TRequest, TResponse>.ThrowGrpcWrapperException(StatusCode.Internal, "Test gRPC Exception");
MockGrpcWrapper<TRequest, TResponse>.ThrowGrpcWrapperException(StatusCode.Unknown, "Test gRPC Exception");
return null;
}

Expand Down

0 comments on commit 7ca245b

Please sign in to comment.