Skip to content

Commit

Permalink
Update pause/resume logic for doc lambdas (microsoft#23757)
Browse files Browse the repository at this point in the history
## Description

Update pause/resume logic for doc lambdas to make it simpler:
(Previous PR for context:
microsoft#23138)

1. During a pause event, the kafka consumer will be paused at the lowest
offset of all doc's lastSuccessfulOffset to ensure no missing messages
during resume. Earlier we were using head, but in that case some
messages might get lost if they were not completely processed when
pausing.
2. Made some updates to account for the use case where there are
multiple doc partitions in a single kafka partition and only one of them
(or few of them, but not all) triggered pause.
- In this case, all the doc partitions will be updated to a paused state
during pause, so that all of them can be resumed and allowed to
reprocess ops during resume.
- Simplified the resume logic by removing resumeBackToOffset variable
and reprocessing range, and the doc partitions will now be resumed from
the first offset that is > its tail (checkpointed offset).
3. Updated the asserts in createContext and checkpointing logic, to
account for edge cases where the context manager's head is resumed but
tail is still pending to be updated.
4. Improved readability by changing headUpdatedAfterResume and
tailUpdatedAfterResume to headPaused and tailPaused to track whether
head/tail has been updated after a pause/resume event.
5. Some cleanup and logging udpates.
6. Changes to not emit error from document context if it is already
closed.
7. Changes to restart the service if the error.shouldRestart = true in
document partition, so that any error in setting up the circuit breaker
during lambda.create restarts the service instead of marking the doc as
corrupt.

## Reviewer Guidance

I tested with opbackup2 (draft PR:
https://msazure.visualstudio.com/One/_git/FRS/pullrequest/11660231) -
There are a lot of edge cases with document lambdas, and hence we will
enable it slowly in each ring.

---------

Co-authored-by: Shubhangi Agarwal <[email protected]>
  • Loading branch information
shubhi1092 and Shubhangi Agarwal authored Feb 5, 2025
1 parent 0aeab62 commit d63f674
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ export class DocumentContextManager extends EventEmitter {

private closed = false;

private headUpdatedAfterResume = false; // used to track whether the head has been updated after a resume event, so that we allow moving out of order only once during resume.
private tailUpdatedAfterResume = false; // used to track whether the tail has been updated after a resume event, so that we allow moving out of order only once during resume.
// Below flags are used to track whether head/tail has been updated after a pause/resume event.
// This is to allow moving out of order once during resume.
// Value = true means they are in a paused state and are waiting to be updated during resume.
private headPaused = false;
private tailPaused = false;

constructor(private readonly partitionContext: IContext) {
super();
Expand All @@ -50,15 +53,26 @@ export class DocumentContextManager extends EventEmitter {
* This class is responsible for the lifetime of the context
*/
public createContext(routingKey: IRoutingKey, head: IQueuedMessage): DocumentContext {
// Contexts should only be created within the processing range of the manager
assert(head.offset > this.tail.offset && head.offset <= this.head.offset);
if (!this.headPaused && this.tailPaused) {
// tail is resumed after head, so its possible to be in this state, but vice versa is not possible
// this means that tail is pending to be updated after resume, so it might be having an invalid value currently
assert(head.offset === this.head.offset);
} else {
// both head and tail are either paused or resumed
// Contexts should only be created within the processing range of the manager
assert(head.offset > this.tail.offset && head.offset <= this.head.offset);
}

// Create the new context and register for listeners on it
const context = new DocumentContext(
routingKey,
head,
this.partitionContext.log,
() => this.tail,
() => ({
headPaused: this.headPaused,
tailPaused: this.tailPaused,
}),
);
this.contexts.add(context);
context.addListener("checkpoint", (restartOnCheckpointFailure?: boolean) =>
Expand All @@ -68,18 +82,20 @@ export class DocumentContextManager extends EventEmitter {
Lumberjack.verbose("Emitting error from contextManager, context error event.");
this.emit("error", error, errorData);
});
context.addListener("pause", (offset: number, reason?: any) => {
// Find the lowest offset of all contexts and emit pause
let lowestOffset = offset;
context.addListener("pause", (offset?: number, reason?: any) => {
// Find the lowest offset of all doc contexts' lastSuccessfulOffset and emit pause at that offset to ensure we dont miss any messages during resume (reprocessing)
let lowestOffset = offset ?? Number.MAX_SAFE_INTEGER;
for (const docContext of this.contexts) {
if (docContext.head.offset < lowestOffset) {
lowestOffset = docContext.head.offset;
if (docContext.lastSuccessfulOffset < lowestOffset) {
lowestOffset = docContext.lastSuccessfulOffset;
}
}
this.headUpdatedAfterResume = false; // reset this flag when we pause
this.tailUpdatedAfterResume = false; // reset this flag when we pause
lowestOffset =
lowestOffset > -1 && lowestOffset < Number.MAX_SAFE_INTEGER ? lowestOffset : 0;
this.headPaused = true;
this.tailPaused = true;
Lumberjack.info("Emitting pause from contextManager", { lowestOffset, offset, reason });
this.emit("pause", lowestOffset, offset, reason);
this.emit("pause", lowestOffset, reason);
});
context.addListener("resume", () => {
this.emit("resume");
Expand All @@ -97,12 +113,11 @@ export class DocumentContextManager extends EventEmitter {
}

/**
* Updates the head to the new offset. The head offset will not be updated if it stays the same or moves backwards, except if the resumeBackToOffset is specified.
* resumeBackToOffset is specified during resume after a lambda pause (eg: circuit breaker)
* Updates the head to the new offset. The head offset will not be updated if it stays the same or moves backwards, unless headPaused is true.
* @returns True if the head was updated, false if it was not.
*/
public setHead(head: IQueuedMessage, resumeBackToOffset?: number | undefined) {
if (head.offset > this.head.offset || head.offset === resumeBackToOffset) {
public setHead(head: IQueuedMessage) {
if (head.offset > this.head.offset || this.headPaused) {
// If head is moving backwards
if (head.offset <= this.head.offset) {
if (head.offset <= this.lastCheckpoint.offset) {
Expand All @@ -112,57 +127,65 @@ export class DocumentContextManager extends EventEmitter {
newHeadOffset: head.offset,
currentHeadOffset: this.head.offset,
lastCheckpointOffset: this.lastCheckpoint.offset,
currentTailOffset: this.tail.offset,
},
);
return false;
}
if (this.headUpdatedAfterResume) {
Lumberjack.warning(
"ContextManager head is moving backwards again after a previous move backwards. This is unexpected.",
{ resumeBackToOffset, currentHeadOffset: this.head.offset },
);
return false;
}

// allow moving backwards
Lumberjack.info(
"Allowing the contextManager head to move to the specified offset",
{ resumeBackToOffset, currentHeadOffset: this.head.offset },
"Allowing the contextManager head to move backwards to the specified offset",
{
newHeadOffset: head.offset,
currentHeadOffset: this.head.offset,
currentTailOffset: this.tail.offset,
headPaused: this.headPaused,
},
);
}
if (!this.headUpdatedAfterResume && resumeBackToOffset !== undefined) {
Lumberjack.info("Setting headUpdatedAfterResume to true", {
resumeBackToOffset,

if (this.headPaused) {
Lumberjack.info("Setting headPaused to false", {
newHeadOffset: head.offset,
currentHeadOffset: this.head.offset,
currentTailOffset: this.tail.offset,
});
this.headUpdatedAfterResume = true;
this.headPaused = false;
}

this.head = head;
return true;
}

return false;
}

public setTail(tail: IQueuedMessage, resumeBackToOffset?: number | undefined) {
public setTail(tail: IQueuedMessage) {
assert(
(tail.offset > this.tail.offset ||
(tail.offset === resumeBackToOffset && !this.tailUpdatedAfterResume)) &&
tail.offset <= this.head.offset,
`Tail offset ${tail.offset} must be greater than the current tail offset ${this.tail.offset} or equal to the resume offset ${resumeBackToOffset} if not yet resumed (tailUpdatedAfterResume: ${this.tailUpdatedAfterResume}), and less than or equal to the head offset ${this.head.offset}.`,
(tail.offset > this.tail.offset || this.tailPaused) && tail.offset <= this.head.offset,
`Tail offset ${tail.offset} must be greater than the current tail offset ${this.tail.offset} or tailPaused should be true (${this.tailPaused}), and less than or equal to the head offset ${this.head.offset}.`,
);

if (tail.offset <= this.tail.offset) {
Lumberjack.info("Allowing the contextManager tail to move to the specified offset.", {
resumeBackToOffset,
currentTailOffset: this.tail.offset,
});
Lumberjack.info(
"Allowing the contextManager tail to move backwards to the specified offset",
{
newTailOffset: tail.offset,
currentTailOffset: this.tail.offset,
currentHeadOffset: this.head.offset,
tailPaused: this.tailPaused,
},
);
}

if (!this.tailUpdatedAfterResume && resumeBackToOffset !== undefined) {
Lumberjack.info("Setting tailUpdatedAfterResume to true", {
resumeBackToOffset,
if (this.tailPaused) {
Lumberjack.info("Setting tailPaused to false", {
newTailOffset: tail.offset,
currentTailOffset: this.tail.offset,
currentHeadOffset: this.head.offset,
});
this.tailUpdatedAfterResume = true;
this.tailPaused = false;
}

this.tail = tail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,33 @@ export class DocumentContext extends EventEmitter implements IContext {
private headInternal: IQueuedMessage;
private tailInternal: IQueuedMessage;

private lastSuccessfulOffsetInternal: number;

private closed = false;
private contextError = undefined;

public headUpdatedAfterResume = false; // used to track whether the head has been updated after a resume event, so that we allow moving out of order only once during resume.
// Below flag is used to track whether head has been updated after a pause/resume event.
// This is to allow moving out of order once during resume.
// Value = true means it is in a paused state and waiting to be updated during resume.
public headPaused = false;

constructor(
private readonly routingKey: IRoutingKey,
head: IQueuedMessage,
public readonly log: ILogger | undefined,
private readonly getLatestTail: () => IQueuedMessage,
private readonly getContextManagerPauseState: () => {
headPaused: boolean;
tailPaused: boolean;
},
) {
super();

// Head represents the largest offset related to the document that is not checkpointed.
// Tail will be set to the checkpoint offset of the previous head
this.headInternal = head;
this.tailInternal = this.getLatestTail();
this.lastSuccessfulOffsetInternal = this.tailInternal.offset; // will be -1 at creation
}

public get head(): IQueuedMessage {
Expand All @@ -51,42 +61,68 @@ export class DocumentContext extends EventEmitter implements IContext {
return this.tailInternal;
}

public get lastSuccessfulOffset(): number {
return this.lastSuccessfulOffsetInternal;
}

/**
* Returns whether or not there is pending work in flight - i.e. the head and tail are not equal
*/
public hasPendingWork(): boolean {
return this.headInternal !== this.tailInternal;
}

/**
* Sets the last successfully processed offset.
*/
public setLastSuccessfulOffset(offset: number) {
this.lastSuccessfulOffsetInternal = offset;
}

/**
* Sets the state to pause, i.e. headPaused = true, without emitting the pause event.
* It is different than pause() method which emits the pause event.
* This is used to set the state to pause when another doc in the same kafka partition triggered pause and we want to pause all the docs in that kafka partition.
*/
public setStateToPause() {
this.headPaused = true;
}

/**
* Updates the head offset for the context.
*/
public setHead(head: IQueuedMessage, resumeBackToOffset?: number | undefined) {
public setHead(head: IQueuedMessage) {
assert(
head.offset > this.head.offset ||
(head.offset === resumeBackToOffset && !this.headUpdatedAfterResume),
`Head offset ${head.offset} must be greater than the current head offset ${this.head.offset} or equal to the resume offset ${resumeBackToOffset} if not yet resumed (headUpdatedAfterResume: ${this.headUpdatedAfterResume}). Topic ${head.topic}, partition ${head.partition}, tenantId ${this.routingKey.tenantId}, documentId ${this.routingKey.documentId}.`,
head.offset > this.head.offset || this.headPaused,
`Head offset ${head.offset} must be greater than the current head offset ${this.head.offset} or headPaused should be true (${this.headPaused}). Topic ${head.topic}, partition ${head.partition}, tenantId ${this.routingKey.tenantId}, documentId ${this.routingKey.documentId}.`,
);

// If head is moving backwards
if (head.offset <= this.head.offset) {
if (head.offset <= this.tailInternal.offset) {
if (head.offset <= this.tail.offset) {
Lumberjack.info(
"Not updating documentContext head since new head's offset is <= last checkpoint offset (tailInternal), returning early",
"Not updating documentContext head since new head's offset is <= last checkpoint offset (tail), returning early",
{
newHeadOffset: head.offset,
currentHeadOffset: this.head.offset,
tailInternalOffset: this.tailInternal.offset,
currentTailOffset: this.tail.offset,
documentId: this.routingKey.documentId,
},
);
return false;
}
Lumberjack.info("Allowing the document context head to move to the specified offset", {
resumeBackToOffset,
currentHeadOffset: this.head.offset,
documentId: this.routingKey.documentId,
});

// allow moving backwards
Lumberjack.info(
"Allowing the document context head to move backwards to the specified offset",
{
newHeadOffset: head.offset,
currentHeadOffset: this.head.offset,
currentTailOffset: this.tail.offset,
headPaused: this.headPaused,
documentId: this.routingKey.documentId,
},
);
}

// When moving back to a state where head and tail differ we set the tail to be the old head, as in the
Expand All @@ -95,13 +131,14 @@ export class DocumentContext extends EventEmitter implements IContext {
this.tailInternal = this.getLatestTail();
}

if (!this.headUpdatedAfterResume && resumeBackToOffset !== undefined) {
Lumberjack.info("Setting headUpdatedAfterResume to true", {
resumeBackToOffset,
if (this.headPaused) {
Lumberjack.info("Setting headPaused to false", {
newHeadOffset: head.offset,
currentHeadOffset: this.head.offset,
currentTailOffset: this.tail.offset,
documentId: this.routingKey.documentId,
});
this.headUpdatedAfterResume = true;
this.headPaused = false;
}

this.headInternal = head;
Expand All @@ -113,6 +150,36 @@ export class DocumentContext extends EventEmitter implements IContext {
return;
}

// skip checkpoint in paused state
const contextManagerPauseState = this.getContextManagerPauseState();
if (
this.headPaused ||
(!contextManagerPauseState.headPaused && contextManagerPauseState.tailPaused)
) {
const telemetryProperties = {
documentId: this.routingKey.documentId,
tenantId: this.routingKey.tenantId,
headOffset: this.head.offset,
tailOffset: this.tail.offset,
checkpointOffset: message.offset,
headPaused: this.headPaused,
};
if (this.headPaused) {
Lumberjack.info(
"Skipping doc checkpoint since the documentContext is currently in paused state",
telemetryProperties,
);
} else {
// contextManager's tail is resumed after head, so its possible to be in this state, but vice versa is not possible
// but document shouldnt be checkpointing at this time, adding a log here to monitor if this happens
Lumberjack.warning(
"Skipping doc checkpoint since contextManager's tail is not yet updated after resume",
telemetryProperties,
);
}
return;
}

// Assert offset is between the current tail and head
const offset = message.offset;

Expand All @@ -127,6 +194,14 @@ export class DocumentContext extends EventEmitter implements IContext {
}

public error(error: any, errorData: IContextErrorData) {
if (this.closed) {
// don't emit errors after closing
Lumberjack.info("Skipping emitting error since the documentContext is already closed", {
documentId: this.routingKey.documentId,
tenantId: this.routingKey.tenantId,
});
return;
}
this.contextError = error;
Lumberjack.verbose("Emitting error from documentContext");
this.emit("error", error, errorData);
Expand All @@ -142,8 +217,8 @@ export class DocumentContext extends EventEmitter implements IContext {
return this.contextError;
}

public pause(offset: number, reason?: any) {
this.headUpdatedAfterResume = false; // reset this flag when we pause
public pause(offset?: number, reason?: any) {
this.headPaused = true;
this.emit("pause", offset, reason);
}

Expand Down
Loading

0 comments on commit d63f674

Please sign in to comment.