Skip to content

Commit

Permalink
Add the PartitionId value to the ProcessingMessage log event (#1186)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum authored Feb 13, 2025
1 parent 4d886da commit 3bc255c
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 6 deletions.
4 changes: 3 additions & 1 deletion src/DurableTask.AzureStorage/AnalyticsEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ public void DiscardingWorkItem(
ExtensionVersion);
}

[Event(EventIds.ProcessingMessage, Level = EventLevel.Informational, Task = Tasks.Processing, Opcode = EventOpcode.Receive, Version = 5)]
[Event(EventIds.ProcessingMessage, Level = EventLevel.Informational, Task = Tasks.Processing, Opcode = EventOpcode.Receive, Version = 6)]
public void ProcessingMessage(
Guid relatedActivityId,
string Account,
Expand All @@ -1030,6 +1030,7 @@ public void ProcessingMessage(
string ExecutionId,
string MessageId,
int Age,
string PartitionId,
long SequenceNumber,
int Episode,
bool IsExtendedSession,
Expand All @@ -1047,6 +1048,7 @@ public void ProcessingMessage(
ExecutionId ?? string.Empty,
MessageId,
Age,
PartitionId,
SequenceNumber,
Episode,
IsExtendedSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,10 @@ async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(boo
}
else
{
session.TraceProcessingMessage(message, isExtendedSession: false);
session.TraceProcessingMessage(
message,
isExtendedSession: false,
partitionId: session.ControlQueue.Name);
}
}

Expand Down Expand Up @@ -1525,7 +1528,7 @@ public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(
});

TraceMessageReceived(this.settings, session.MessageData, this.azureStorageClient.QueueAccountName);
session.TraceProcessingMessage(message, isExtendedSession: false);
session.TraceProcessingMessage(message, isExtendedSession: false, this.workItemQueue.Name);

if (!this.activeActivitySessions.TryAdd(message.Id, session))
{
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.AzureStorage/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2500,6 +2500,7 @@ public ProcessingMessage(
string executionId,
string messageId,
int age,
string partitionId,
long sequenceNumber,
int episode,
bool isExtendedSession)
Expand All @@ -2513,6 +2514,7 @@ public ProcessingMessage(
this.ExecutionId = executionId;
this.MessageId = messageId;
this.Age = age;
this.PartitionId = partitionId;
this.SequenceNumber = sequenceNumber;
this.Episode = episode;
this.IsExtendedSession = isExtendedSession;
Expand Down Expand Up @@ -2544,6 +2546,9 @@ public ProcessingMessage(
[StructuredLogField]
public int Age { get; }

[StructuredLogField]
public string PartitionId { get; }

[StructuredLogField]
public long SequenceNumber { get; }

Expand Down Expand Up @@ -2572,6 +2577,7 @@ void IEventSourceEvent.WriteEventSource() => AnalyticsEventSource.Log.Processing
this.ExecutionId,
this.MessageId,
this.Age,
this.PartitionId,
this.SequenceNumber,
this.Episode,
this.IsExtendedSession,
Expand Down
2 changes: 2 additions & 0 deletions src/DurableTask.AzureStorage/Logging/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ internal void ProcessingMessage(
string executionId,
string messageId,
int age,
string partitionId,
long sequenceNumber,
int episode,
bool isExtendedSession)
Expand All @@ -837,6 +838,7 @@ internal void ProcessingMessage(
executionId,
messageId,
age,
partitionId,
sequenceNumber,
episode,
isExtendedSession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
var messages = new List<TaskMessage>(this.CurrentMessageBatch.Count);
foreach (MessageData msg in this.CurrentMessageBatch)
{
this.TraceProcessingMessage(msg, isExtendedSession: true);
this.TraceProcessingMessage(msg, isExtendedSession: true, partitionId: this.ControlQueue.Name);
messages.Add(msg.TaskMessage);
}

Expand Down
5 changes: 3 additions & 2 deletions src/DurableTask.AzureStorage/Messaging/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void StartNewLogicalTraceScope()
AnalyticsEventSource.SetLogicalTraceActivityId(this.TraceActivityId);
}

public void TraceProcessingMessage(MessageData data, bool isExtendedSession)
public void TraceProcessingMessage(MessageData data, bool isExtendedSession, string partitionId)
{
if (data == null)
{
Expand All @@ -64,7 +64,8 @@ public void TraceProcessingMessage(MessageData data, bool isExtendedSession)
taskMessage.OrchestrationInstance.InstanceId,
taskMessage.OrchestrationInstance.ExecutionId,
queueMessage.MessageId,
Math.Max(0, (int)DateTimeOffset.UtcNow.Subtract(queueMessage.InsertedOn.Value).TotalMilliseconds),
age: Math.Max(0, (int)DateTimeOffset.UtcNow.Subtract(queueMessage.InsertedOn.Value).TotalMilliseconds),
partitionId,
data.SequenceNumber,
data.Episode.GetValueOrDefault(-1),
isExtendedSession);
Expand Down

0 comments on commit 3bc255c

Please sign in to comment.