Skip to content

Commit

Permalink
fix AsyncQueueSegmentDispatcher performance issue
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-xiaoshuang authored and luxiaoshuang committed Mar 29, 2024
1 parent 6900a0f commit 6b01726
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 71 deletions.
9 changes: 4 additions & 5 deletions src/SkyApm.Abstractions/Common/Tags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ namespace SkyApm.Common
public static class Tags
{
public static readonly string URL = "url";

public static readonly string PATH = "path";

public static readonly string PATH = "path";

public static readonly string HTTP_METHOD = "http.method";

Expand All @@ -40,9 +39,9 @@ public static class Tags
public static readonly string DB_TYPE = "db.type";

public static readonly string DB_INSTANCE = "db.instance";

public static readonly string DB_STATEMENT = "db.statement";

public static readonly string DB_BIND_VARIABLES = "db.bind_vars";

public static readonly string MQ_TOPIC = "mq.topic";
Expand All @@ -59,4 +58,4 @@ public static class Tags

public static readonly string CACHE_CMD = "cache.cmd";
}
}
}
29 changes: 18 additions & 11 deletions src/SkyApm.Abstractions/Config/TransportConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,33 @@

namespace SkyApm.Config
{
public static class ProtocolVersions
{
public static string V8 { get; } = "v8";
}

[Config("SkyWalking", "Transport")]
public class TransportConfig
{
public int QueueSize { get; set; } = 30000;
public string ProtocolVersion { get; set; } = ProtocolVersions.V8;

/// <summary>
/// Flush Interval Millisecond
/// TotalQueueSize = QueueSize * Parallel
/// Elements will be dropped if queues are full.
/// </summary>
public int Interval { get; set; } = 3000;
public int QueueSize { get; set; } = 10000;

/// <summary>
/// Data queued beyond this time will be discarded.
/// TotalBatchSize = BatchSize * Parallel
/// </summary>
public int BatchSize { get; set; } = 3000;
public int BatchSize { get; set; } = 2000;

public string ProtocolVersion { get; set; } = ProtocolVersions.V8;
}
public int Parallel { get; set; } = 5;

public static class ProtocolVersions
{
public static string V8 { get; } = "v8";
/// <summary>
/// max interval between each batch, in milliseconds
/// -1 - waits for previous batch to complete
/// </summary>
public int Interval { get; set; } = 50;
}
}
}
4 changes: 0 additions & 4 deletions src/SkyApm.Abstractions/Transport/ISegmentDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*
*/

using System.Threading;
using System.Threading.Tasks;
using SkyApm.Tracing.Segments;

namespace SkyApm.Transport
Expand All @@ -26,8 +24,6 @@ public interface ISegmentDispatcher
{
bool Dispatch(SegmentContext segmentContext);

Task Flush(CancellationToken token = default(CancellationToken));

void Close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private static IServiceCollection AddSkyAPMCore(this IServiceCollection services
services.AddSingleton<IExecutionService, RegisterService>();
services.AddSingleton<IExecutionService, LogReportService>();
services.AddSingleton<IExecutionService, PingService>();
services.AddSingleton<IExecutionService, SegmentReportService>();
services.AddSingleton<SegmentReportService>();
services.AddSingleton<IExecutionService, CLRStatsService>();
services.AddSingleton<IInstrumentStartup, InstrumentStartup>();
services.AddSingleton<IRuntimeEnvironment>(RuntimeEnvironment.Instance);
Expand Down Expand Up @@ -154,5 +154,5 @@ private static IServiceCollection AddSkyApmLogging(this IServiceCollection servi
return services;
}

}
}
}
}
24 changes: 8 additions & 16 deletions src/SkyApm.Core/Service/SegmentReportService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*
*/

using System;
using System.Threading;
using System.Threading.Tasks;
using SkyApm.Config;
Expand All @@ -25,33 +24,26 @@

namespace SkyApm.Service
{
public class SegmentReportService : ExecutionService
/// <summary>
/// deprecated
/// </summary>
public class SegmentReportService
{
protected readonly ILogger Logger;
private readonly TransportConfig _config;
private readonly ISegmentDispatcher _dispatcher;

public SegmentReportService(IConfigAccessor configAccessor, ISegmentDispatcher dispatcher,
IRuntimeEnvironment runtimeEnvironment, ILoggerFactory loggerFactory)
: base(runtimeEnvironment, loggerFactory)
{
_dispatcher = dispatcher;
_config = configAccessor.Get<TransportConfig>();
Period = TimeSpan.FromMilliseconds(_config.Interval);
}

protected override TimeSpan DueTime { get; } = TimeSpan.FromSeconds(3);

protected override TimeSpan Period { get; }

protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
return _dispatcher.Flush(cancellationToken);
_dispatcher = dispatcher;
}

protected override Task Stopping(CancellationToken cancellationToke)
protected Task Stopping(CancellationToken cancellationToke)
{
_dispatcher.Close();
return Task.CompletedTask;
}
}
}
}
119 changes: 91 additions & 28 deletions src/SkyApm.Core/Transport/AsyncQueueSegmentDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
Expand All @@ -32,72 +33,134 @@ public class AsyncQueueSegmentDispatcher : ISegmentDispatcher
private readonly TransportConfig _config;
private readonly ISegmentReporter _segmentReporter;
private readonly ISegmentContextMapper _segmentContextMapper;
private readonly ConcurrentQueue<SegmentRequest> _segmentQueue;
private readonly IRuntimeEnvironment _runtimeEnvironment;
private readonly CancellationTokenSource _cancellation;
private int _offset;
private readonly Random _random;
private long _produceCount = 0L;
private long _consumeCount = 0L;
private readonly BlockingCollection<SegmentRequest>[] _queueArray;
private readonly long[] _countArray;

public AsyncQueueSegmentDispatcher(IConfigAccessor configAccessor,
ISegmentReporter segmentReporter, IRuntimeEnvironment runtimeEnvironment,
ISegmentContextMapper segmentContextMapper, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger(typeof(AsyncQueueSegmentDispatcher));
_config = configAccessor.Get<TransportConfig>();
_segmentReporter = segmentReporter;
_segmentContextMapper = segmentContextMapper;
_runtimeEnvironment = runtimeEnvironment;
_logger = loggerFactory.CreateLogger(typeof(AsyncQueueSegmentDispatcher));
_config = configAccessor.Get<TransportConfig>();
_segmentQueue = new ConcurrentQueue<SegmentRequest>();
_cancellation = new CancellationTokenSource();
_random = new Random();
_queueArray = new BlockingCollection<SegmentRequest>[_config.Parallel];
_countArray = new long[_config.Parallel];
for (int i = 0; i < _config.Parallel; ++ i)
{
_queueArray[i] = new BlockingCollection<SegmentRequest>(_config.QueueSize);
_countArray[i] = 0;
}
for (int i = 0; i < _config.Parallel; ++ i)
{
int taskId = i;
Task.Run(() => Flush(taskId));
}
Task.Run(() => Statistics());
}

public bool Dispatch(SegmentContext segmentContext)
{
if (!_runtimeEnvironment.Initialized || segmentContext == null || !segmentContext.Sampled)
return false;

// todo performance optimization for ConcurrentQueue
if (_config.QueueSize < _offset || _cancellation.IsCancellationRequested)
if (_cancellation.IsCancellationRequested)
return false;

var segment = _segmentContextMapper.Map(segmentContext);

if (segment == null)
return false;

_segmentQueue.Enqueue(segment);
int queueId = _random.Next(_config.Parallel);

bool result = _queueArray[queueId].TryAdd(segment, 0);

Interlocked.Increment(ref _offset);
if (result)
{
Interlocked.Add(ref _produceCount, 1);
Interlocked.Add(ref _countArray[queueId], 1);
}

_logger.Debug($"Dispatch trace segment. [SegmentId]={segmentContext.SegmentId}.");
return true;
_logger.Debug($"Dispatch trace segment. [SegmentId]={segmentContext.SegmentId},[result=]{result}.");

return result;
}

public Task Flush(CancellationToken token = default(CancellationToken))
private void Flush(int taskId)
{
// todo performance optimization for ConcurrentQueue
//var queued = _segmentQueue.Count;
//var limit = queued <= _config.PendingSegmentLimit ? queued : _config.PendingSegmentLimit;
var limit = _config.BatchSize;
var index = 0;
var segments = new List<SegmentRequest>(limit);
while (index++ < limit && _segmentQueue.TryDequeue(out var request))
while (!_cancellation.IsCancellationRequested)
{
segments.Add(request);
Interlocked.Decrement(ref _offset);
// handle dedicated queue
{
int count = DoFlush(taskId, taskId, 2000);
if (count > 0)
{
continue;
}
}
// handle other queue
{
int queueId = _random.Next(_config.Parallel);
if (queueId == taskId)
{
continue;
}
DoFlush(taskId, queueId, 0);
}
}
}

// send async
private int DoFlush(int taskId, int queueId, int timeout)
{
var segments = new List<SegmentRequest>(_config.BatchSize);
for (int i = 0; i < _config.BatchSize; ++ i)
{
if (!_queueArray[queueId].TryTake(out var request, timeout))
{
// segments is not full
break;
}
segments.Add(request);
}
if (segments.Count > 0)
_segmentReporter.ReportAsync(segments, token);

Interlocked.Exchange(ref _offset, _segmentQueue.Count);

return Task.CompletedTask;
{
try
{
Task[] task = new Task[1];
task[0] = _segmentReporter.ReportAsync(segments, new CancellationToken());
Task.WaitAll(task, _config.Interval);
}
catch (Exception e)
{
_logger.Error("Task.WaitAll failed." + taskId + "," + queueId + "," + segments.Count, e);
}
Interlocked.Add(ref _consumeCount, segments.Count);
Interlocked.Add(ref _countArray[queueId], 0 - segments.Count);
}
return segments.Count;
}

public void Close()
{
_cancellation.Cancel();
}

private void Statistics()
{
while (!_cancellation.IsCancellationRequested)
{
_logger.Information("statistics: produce=" + _produceCount + ",consume=" + _consumeCount + ",detail=[" + String.Join(",", _countArray) + "],");
Thread.Sleep(1000 * 60);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public static IConfigurationBuilder AddSkyWalkingDefaultConfig(this IConfigurati
{ "SkyWalking:Logging:RollOnFileSizeLimit", configuration?.GetSection("SkyWalking:Logging:RollOnFileSizeLimit").Value ?? "false" },
{ "SkyWalking:Logging:RetainedFileCountLimit", configuration?.GetSection("SkyWalking:Logging:RetainedFileCountLimit").Value ?? "10" },
{ "SkyWalking:Logging:RetainedFileTimeLimit", configuration?.GetSection("SkyWalking:Logging:RetainedFileTimeLimit").Value ?? "864000000" },
{ "SkyWalking:Transport:Interval", configuration?.GetSection("SkyWalking:Transport:Interval").Value ?? "3000" },
{ "SkyWalking:Transport:ProtocolVersion", configuration?.GetSection("SkyWalking:Transport:ProtocolVersion").Value ?? ProtocolVersions.V8 },
{ "SkyWalking:Transport:QueueSize", configuration?.GetSection("SkyWalking:Transport:QueueSize").Value ?? "30000" },
{ "SkyWalking:Transport:BatchSize", configuration?.GetSection("SkyWalking:Transport:BatchSize").Value ?? "3000" },
{ "SkyWalking:Transport:ProtocolVersion", configuration?.GetSection("SkyWalking:Transport:ProtocolVersion").Value ?? ProtocolVersions.V8 },
{ "SkyWalking:Transport:QueueSize", configuration?.GetSection("SkyWalking:Transport:QueueSize").Value ?? "10000" },
{ "SkyWalking:Transport:BatchSize", configuration?.GetSection("SkyWalking:Transport:BatchSize").Value ?? "2000" },
{ "SkyWalking:Transport:Parallel", configuration?.GetSection("SkyWalking:Transport:Parallel").Value ?? "5" },
{ "SkyWalking:Transport:Interval", configuration?.GetSection("SkyWalking:Transport:Interval").Value ?? "50" },
{ "SkyWalking:Transport:gRPC:Servers", configuration?.GetSection("SkyWalking:Transport:gRPC:Servers").Value ?? "localhost:11800" },
{ "SkyWalking:Transport:gRPC:Timeout", configuration?.GetSection("SkyWalking:Transport:gRPC:Timeout").Value ?? "10000" },
{ "SkyWalking:Transport:gRPC:ReportTimeout", configuration?.GetSection("SkyWalking:Transport:gRPC:ReportTimeout").Value ?? "600000" },
Expand Down Expand Up @@ -84,3 +85,4 @@ private static string BuildDefaultServiceInstanceName()
}
}
}

0 comments on commit 6b01726

Please sign in to comment.