Skip to content

Commit

Permalink
Op bunching 3: Bunch contiguous ops for DDS in a batch - DDS part (mi…
Browse files Browse the repository at this point in the history
…crosoft#22841)

## Reviewer guidance
This is part 3 or 3 of the op bunching feature. This part focuces on the
changes in the DDS. [Part
1](microsoft#22839) and [part
2](microsoft#22840).

## Problem
During op processing, container runtime sends ops one at a time to data
stores to DDSes. If a DDS has received M contiguous ops as part of a
batch, the DDS is called M times to process them individually. This has
performance implications for some DDSes and they would benefit from
receiving and processing these M ops together.

Take shared tree for example:
For each op received which has a sequenced commit, all the pending
commits are processed by the rebaser. So, as the number of ops received
grows, so does the processing of pending commits. The following example
describes this clearly:
Currently if a shared tree client has N pending commits which have yet
to be sequenced, each time that client receives a sequenced commit
authored by another client (an op), it will update each of its pending
commits which takes at least O(N) work.
Instead, if it receives M commits at once, it could do a single update
pass on each pending commit instead of M per pending commit.
It can compose the M commits together into a single change to update
over, so it can potentially go from something like O (N * M) work to O
(N + M) work with batching.

## Solution - op bunching
The solution implemented here is a feature called "op bunching".
With this feature, contiguous ops in a grouped op batch that belong to a
data store / DDS will be bunched and sent to it in an array - The
grouped op is sent as an `ISequencedMessageEnvelope` and the individual
message `contents` in it are sent as an array along with the
`clientSequenceNumber` and `localOpMetadata`.
The container runtime will send bunch of contiguous ops for each data
store to it. The data store will send bunch of contiguous ops for each
DDS to it. The DDS can choose how to process these ops. Shared tree for
instance, would compose the commits in all these ops and update pending
commits with it.
Bunching only contiguous ops for a data store / DDS in a batch preserves
the behavior of processing ops in the sequence it was received.

Couple of behavior changes to note:
1. Op events - An implication of this change is the timing of "op"
events emitted by container runtime and data store runtime will change.
Currently, these layers emit an "op" event immediately after an op is
processed. With this change, an upper layer will only know when a bunch
has been processed by a lower layer. So, it will emit "op" events for
individual ops in the bunch after the entire bunch is processed.
From my understanding, this should be fine because we do not provide any
guarantee that the "op" event will be emitted immediately after an op is
processed. These events will be emitted in order of op processing and
(sometime) after the op is processed.
Take delta manager / container runtime as an example. Delta manager
sends an op for processing to container runtime and emits the "op"
event. However, container runtime may choose to not process these ops
immediately but save them until an entire batch is received. This change
was made but was reverted due to some concerns not related to the topic
discussed here - microsoft#21785.
The chang here is similar to the above behavior where an upper layer
doesn't know and shouldn't care what lower layers do with ops.
2. `metadata` property on message - With this PR, the metadata property
is removed from a message before it's sent to data stores and DDS. This
is because we now send one common message (the grouped op) and an array
of contents. Individual messages within a grouped op have batch begin
and end metadata but they are just added by the runtime to keep it like
old batch messages. The data store and DDS don't care about it so
removing them should be fine.


[AB#20123](https://dev.azure.com/fluidframework/235294da-091d-4c29-84fc-cdfc3d90890b/_workitems/edit/20123)
  • Loading branch information
agarwal-navin authored Oct 29, 2024
1 parent 550286d commit 0b1ce2b
Show file tree
Hide file tree
Showing 20 changed files with 486 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .changeset/hot-falcons-grab.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"section": other
---

Change when the `op` event on `IFluidDataStoreRuntimeEvents` and `IContainerRuntimeBaseEvents` is emitted.
Change when the `op` event on `IFluidDataStoreRuntimeEvents` and `IContainerRuntimeBaseEvents` is emitted

Previous behavior - It was emitted immediately after an op was processed and before the next op was processed.

Expand Down
5 changes: 3 additions & 2 deletions .changeset/neat-lights-worry.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
---
"@fluidframework/datastore-definitions": minor
"@fluidframework/runtime-definitions": minor
"@fluidframework/test-runtime-utils": minor
---
---
"section": deprecation
---

The function `process` on `IFluidDataStoreChannel` and `MockFluidDataStoreRuntime` is now deprecated.
The function `process` on `IFluidDataStoreChannel`, `IDeltaHandler`, `MockFluidDataStoreRuntime` and `MockDeltaConnection` is now deprecated

A new function `processMessages` has been added in its place which will be called to process multiple messages instead of a single one on the channel. This is part of a feature called "Op bunching" where contiguous ops of a given type and to a given data store / DDS are bunched and sent together for processing.

Implementations of `IFluidDataStoreChannel` must now also implement `processMessages`. For a reference implementation, see `FluidDataStoreRuntime::processMessages`.
Implementations of `IFluidDataStoreChannel` and `IDeltaHandler` must now also implement `processMessages`. For reference implementations, see `FluidDataStoreRuntime::processMessages` and `SharedObjectCore::attachDeltaHandler`.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { assert } from '@fluidframework/core-utils/internal';
import { type IChannelAttributes, type IDeltaHandler } from '@fluidframework/datastore-definitions/internal';
import { MessageType, type ISequencedDocumentMessage } from '@fluidframework/driver-definitions/internal';
import type { IRuntimeMessageCollection } from '@fluidframework/runtime-definitions/internal';

import { type IOpContents, type IShimDeltaHandler } from './types.js';
import { attributesMatch, isBarrierOp, isStampedOp } from './utils.js';
Expand Down Expand Up @@ -100,6 +101,13 @@ export class MigrationShimDeltaHandler implements IShimDeltaHandler {
return this.treeDeltaHandler.process(message, local, localOpMetadata);
}

public processMessages(messagesCollection: IRuntimeMessageCollection): void {
const { envelope, messagesContent, local } = messagesCollection;
for (const { contents, localOpMetadata, clientSequenceNumber } of messagesContent) {
this.process({ ...envelope, contents, clientSequenceNumber }, local, localOpMetadata);
}
}

public setConnectionState(connected: boolean): void {
return this.treeDeltaHandler.setConnectionState(connected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { assert } from '@fluidframework/core-utils/internal';
import { type IChannelAttributes, type IDeltaHandler } from '@fluidframework/datastore-definitions/internal';
import { MessageType, type ISequencedDocumentMessage } from '@fluidframework/driver-definitions/internal';
import type { IRuntimeMessageCollection } from '@fluidframework/runtime-definitions/internal';

import { type IOpContents, type IShimDeltaHandler } from './types.js';
import { attributesMatch, isStampedOp } from './utils.js';
Expand Down Expand Up @@ -62,6 +63,13 @@ export class SharedTreeShimDeltaHandler implements IShimDeltaHandler {
return this.handler.process(message, local, localOpMetadata);
}

public processMessages(messagesCollection: IRuntimeMessageCollection): void {
const { envelope, messagesContent, local } = messagesCollection;
for (const { contents, localOpMetadata, clientSequenceNumber } of messagesContent) {
this.process({ ...envelope, contents, clientSequenceNumber }, local, localOpMetadata);
}
}

// No idea whether any of the below 4 methods work as expected
public setConnectionState(connected: boolean): void {
return this.handler.setConnectionState(connected);
Expand Down
26 changes: 25 additions & 1 deletion packages/dds/shared-object-base/src/sharedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
IChannelAttributes,
type IChannelFactory,
IFluidDataStoreRuntime,
type IDeltaHandler,
} from "@fluidframework/datastore-definitions/internal";
import {
type IDocumentMessage,
Expand All @@ -31,6 +32,7 @@ import {
IGarbageCollectionData,
blobCountPropertyName,
totalBlobSizePropertyName,
type IRuntimeMessageCollection,
} from "@fluidframework/runtime-definitions/internal";
import {
toDeltaManagerInternal,
Expand Down Expand Up @@ -496,6 +498,9 @@ export abstract class SharedObjectCore<
localOpMetadata,
);
},
processMessages: (messagesCollection: IRuntimeMessageCollection) => {
this.processMessages(messagesCollection);
},
setConnectionState: (connected: boolean) => {
this.setConnectionState(connected);
},
Expand All @@ -508,7 +513,7 @@ export abstract class SharedObjectCore<
rollback: (content: any, localOpMetadata: unknown) => {
this.rollback(content, localOpMetadata);
},
});
} satisfies IDeltaHandler);
}

/**
Expand Down Expand Up @@ -572,6 +577,25 @@ export abstract class SharedObjectCore<
this.emitInternal("op", message, local, this);
}

/**
* Process messages for this shared object. The messages here are contiguous messages for this object in a batch.
* @param messageCollection - The collection of messages to process.
*/
private processMessages(messagesCollection: IRuntimeMessageCollection) {
const { envelope, messagesContent, local } = messagesCollection;
for (const { contents, localOpMetadata, clientSequenceNumber } of messagesContent) {
this.process(
{
...envelope,
clientSequenceNumber,
contents: parseHandles(contents, this.serializer),
},
local,
localOpMetadata,
);
}
}

/**
* Called when a message has to be resubmitted. This typically happens for unacked messages after a
* reconnection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ export interface IDeltaConnection {
// @alpha
export interface IDeltaHandler {
applyStashedOp(message: any): void;
// @deprecated
process: (message: ISequencedDocumentMessage, local: boolean, localOpMetadata: unknown) => void;
processMessages?: (messageCollection: IRuntimeMessageCollection) => void;
reSubmit(message: any, localOpMetadata: unknown): void;
rollback?(message: any, localOpMetadata: unknown): void;
setConnectionState(connected: boolean): void;
Expand Down
8 changes: 8 additions & 0 deletions packages/runtime/datastore-definitions/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { ISequencedDocumentMessage } from "@fluidframework/driver-definitio
import type {
IExperimentalIncrementalSummaryContext,
IGarbageCollectionData,
IRuntimeMessageCollection,
ISummaryTreeWithStats,
ITelemetryContext,
} from "@fluidframework/runtime-definitions/internal";
Expand Down Expand Up @@ -130,13 +131,20 @@ export interface IDeltaHandler {
* @param local - Whether the message originated from the local client
* @param localOpMetadata - For local client messages, this is the metadata that was submitted with the message.
* For messages from a remote client, this will be undefined.
* @deprecated - Use processMessages instead to process messages.
*/
process: (
message: ISequencedDocumentMessage,
local: boolean,
localOpMetadata: unknown,
) => void;

/**
* Process messages for this channel. The messages here are contiguous messages for this channel in a batch.
* @param messageCollection - The collection of messages to process.
*/
processMessages?: (messageCollection: IRuntimeMessageCollection) => void;

/**
* State change events to indicate changes to the delta connection
* @param connected - true if connected, false otherwise
Expand Down
12 changes: 6 additions & 6 deletions packages/runtime/datastore/src/channelContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
import {
IDocumentStorageService,
ISnapshotTree,
ISequencedDocumentMessage,
} from "@fluidframework/driver-definitions/internal";
import { readAndParse } from "@fluidframework/driver-utils/internal";
import {
Expand All @@ -22,6 +21,7 @@ import {
IGarbageCollectionData,
IFluidDataStoreContext,
ISummarizeResult,
type IRuntimeMessageCollection,
} from "@fluidframework/runtime-definitions/internal";
import { addBlobToSummary } from "@fluidframework/runtime-utils/internal";
import {
Expand All @@ -41,11 +41,11 @@ export interface IChannelContext {

setConnectionState(connected: boolean, clientId?: string);

processOp(
message: ISequencedDocumentMessage,
local: boolean,
localOpMetadata?: unknown,
): void;
/**
* Process messages for this channel context. The messages here are contiguous messages for this context in a batch.
* @param messageCollection - The collection of messages to process.
*/
processMessages(messageCollection: IRuntimeMessageCollection): void;

summarize(
fullTree?: boolean,
Expand Down
61 changes: 46 additions & 15 deletions packages/runtime/datastore/src/channelDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import {
IDeltaConnection,
IDeltaHandler,
} from "@fluidframework/datastore-definitions/internal";
import { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal";
import type { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal";
import type {
IRuntimeMessageCollection,
IRuntimeMessagesContent,
} from "@fluidframework/runtime-definitions/internal";
import { DataProcessingError } from "@fluidframework/telemetry-utils/internal";

const stashedOpMetadataMark = Symbol();
Expand Down Expand Up @@ -46,6 +50,26 @@ function processWithStashedOpMetadataHandling(
}
}

function getContentsWithStashedOpHandling(
messagesContent: readonly IRuntimeMessagesContent[],
) {
const newMessageContents: IRuntimeMessagesContent[] = [];
for (const messageContent of messagesContent) {
if (isStashedOpMetadata(messageContent.localOpMetadata)) {
messageContent.localOpMetadata.forEach(({ contents, metadata }) => {
newMessageContents.push({
contents,
localOpMetadata: metadata,
clientSequenceNumber: messageContent.clientSequenceNumber,
});
});
} else {
newMessageContents.push(messageContent);
}
}
return newMessageContents;
}

export class ChannelDeltaConnection implements IDeltaConnection {
private _handler: IDeltaHandler | undefined;
private stashedOpMd: StashedOpMetadata | undefined;
Expand Down Expand Up @@ -75,24 +99,31 @@ export class ChannelDeltaConnection implements IDeltaConnection {
this.handler.setConnectionState(connected);
}

public process(
message: ISequencedDocumentMessage,
local: boolean,
localOpMetadata: unknown,
) {
public processMessages(messageCollection: IRuntimeMessageCollection): void {
const { envelope, messagesContent, local } = messageCollection;
// catches as data processing error whether or not they come from async pending queues
try {
// catches as data processing error whether or not they come from async pending queues
processWithStashedOpMetadataHandling(
message.contents,
localOpMetadata,
(contents, metadata) =>
this.handler.process({ ...message, contents }, local, metadata),
);
const newMessagesContent = getContentsWithStashedOpHandling(messagesContent);
if (this.handler.processMessages !== undefined) {
this.handler.processMessages({
...messageCollection,
messagesContent: newMessagesContent,
});
} else {
for (const { contents, localOpMetadata, clientSequenceNumber } of newMessagesContent) {
const compatMessage: ISequencedDocumentMessage = {
...envelope,
contents,
clientSequenceNumber,
};
this.handler.process(compatMessage, local, localOpMetadata);
}
}
} catch (error) {
throw DataProcessingError.wrapIfUnrecognized(
error,
"channelDeltaConnectionFailedToProcessMessage",
message,
"channelDeltaConnectionFailedToProcessMessages",
envelope,
);
}
}
Expand Down
18 changes: 6 additions & 12 deletions packages/runtime/datastore/src/dataStoreRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -691,18 +691,12 @@ export class FluidDataStoreRuntime
const channelContext = this.contexts.get(currentAddress);
assert(!!channelContext, "Channel context not found");

// For now, send the message to be processed one by one. This will be updated to send the bunch later.
for (const {
contents,
clientSequenceNumber,
localOpMetadata,
} of currentMessagesContent) {
channelContext.processOp(
{ ...messageCollection.envelope, contents, clientSequenceNumber },
local,
localOpMetadata,
);
}
channelContext.processMessages({
envelope: messageCollection.envelope,
messagesContent: currentMessagesContent,
local,
});

currentMessagesContent = [];
};

Expand Down
37 changes: 21 additions & 16 deletions packages/runtime/datastore/src/localChannelContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import {
import {
IDocumentStorageService,
ISnapshotTree,
ISequencedDocumentMessage,
} from "@fluidframework/driver-definitions/internal";
import {
ITelemetryContext,
IFluidDataStoreContext,
IGarbageCollectionData,
ISummarizeResult,
type IPendingMessagesState,
type IRuntimeMessageCollection,
} from "@fluidframework/runtime-definitions/internal";
import {
ITelemetryLoggerExt,
Expand All @@ -41,7 +42,11 @@ import { ISharedObjectRegistry } from "./dataStoreRuntime.js";
*/
export abstract class LocalChannelContextBase implements IChannelContext {
private globallyVisible = false;
protected readonly pending: ISequencedDocumentMessage[] = [];
/** Tracks the messages for this channel that are sent while it's not loaded */
protected pendingMessagesState: IPendingMessagesState = {
messageCollections: [],
pendingCount: 0,
};
constructor(
protected readonly id: string,
protected readonly runtime: IFluidDataStoreRuntime,
Expand Down Expand Up @@ -74,11 +79,11 @@ export abstract class LocalChannelContextBase implements IChannelContext {
}
}

public processOp(
message: ISequencedDocumentMessage,
local: boolean,
localOpMetadata: unknown,
): void {
/**
* Process messages for this channel context. The messages here are contiguous messages for this context in a batch.
* @param messageCollection - The collection of messages to process.
*/
processMessages(messageCollection: IRuntimeMessageCollection): void {
assert(
this.globallyVisible,
0x2d3 /* "Local channel must be globally visible when processing op" */,
Expand All @@ -88,13 +93,17 @@ export abstract class LocalChannelContextBase implements IChannelContext {
// delay loading. So after the container is attached and some other client joins which start generating
// ops for this channel. So not loaded local channel can still receive ops and we store them to process later.
if (this.isLoaded) {
this.services.value.deltaConnection.process(message, local, localOpMetadata);
this.services.value.deltaConnection.processMessages(messageCollection);
} else {
assert(
local === false,
!messageCollection.local,
0x189 /* "Should always be remote because a local dds shouldn't generate ops before loading" */,
);
this.pending.push(message);
const propsCopy = {
...messageCollection,
messagesContent: Array.from(messageCollection.messagesContent),
};
this.pendingMessagesState.messageCollections.push(propsCopy);
}
}

Expand Down Expand Up @@ -254,12 +263,8 @@ export class RehydratedLocalChannelContext extends LocalChannelContextBase {
this.id,
);
// Send all pending messages to the channel
for (const message of this.pending) {
this.services.value.deltaConnection.process(
message,
false,
undefined /* localOpMetadata */,
);
for (const messageCollection of this.pendingMessagesState.messageCollections) {
this.services.value.deltaConnection.processMessages(messageCollection);
}
return channel;
} catch (err) {
Expand Down
Loading

0 comments on commit 0b1ce2b

Please sign in to comment.