diff --git a/.changeset/hot-falcons-grab.md b/.changeset/hot-falcons-grab.md index 79fbdbfd16d6..f02279bffe21 100644 --- a/.changeset/hot-falcons-grab.md +++ b/.changeset/hot-falcons-grab.md @@ -6,7 +6,7 @@ "section": other --- -Change when the `op` event on `IFluidDataStoreRuntimeEvents` and `IContainerRuntimeBaseEvents` is emitted. +Change when the `op` event on `IFluidDataStoreRuntimeEvents` and `IContainerRuntimeBaseEvents` is emitted Previous behavior - It was emitted immediately after an op was processed and before the next op was processed. diff --git a/.changeset/neat-lights-worry.md b/.changeset/neat-lights-worry.md index 9457a0010fb2..c31ef4e30996 100644 --- a/.changeset/neat-lights-worry.md +++ b/.changeset/neat-lights-worry.md @@ -1,4 +1,5 @@ --- +"@fluidframework/datastore-definitions": minor "@fluidframework/runtime-definitions": minor "@fluidframework/test-runtime-utils": minor --- @@ -6,8 +7,8 @@ "section": deprecation --- -The function `process` on `IFluidDataStoreChannel` and `MockFluidDataStoreRuntime` is now deprecated. +The function `process` on `IFluidDataStoreChannel`, `IDeltaHandler`, `MockFluidDataStoreRuntime` and `MockDeltaConnection` is now deprecated A new function `processMessages` has been added in its place which will be called to process multiple messages instead of a single one on the channel. This is part of a feature called "Op bunching" where contiguous ops of a given type and to a given data store / DDS are bunched and sent together for processing. -Implementations of `IFluidDataStoreChannel` must now also implement `processMessages`. For a reference implementation, see `FluidDataStoreRuntime::processMessages`. +Implementations of `IFluidDataStoreChannel` and `IDeltaHandler` must now also implement `processMessages`. For reference implementations, see `FluidDataStoreRuntime::processMessages` and `SharedObjectCore::attachDeltaHandler`. diff --git a/experimental/dds/tree/src/migration-shim/migrationDeltaHandler.ts b/experimental/dds/tree/src/migration-shim/migrationDeltaHandler.ts index 4f83dadabe63..a619b57fb58f 100644 --- a/experimental/dds/tree/src/migration-shim/migrationDeltaHandler.ts +++ b/experimental/dds/tree/src/migration-shim/migrationDeltaHandler.ts @@ -6,6 +6,7 @@ import { assert } from '@fluidframework/core-utils/internal'; import { type IChannelAttributes, type IDeltaHandler } from '@fluidframework/datastore-definitions/internal'; import { MessageType, type ISequencedDocumentMessage } from '@fluidframework/driver-definitions/internal'; +import type { IRuntimeMessageCollection } from '@fluidframework/runtime-definitions/internal'; import { type IOpContents, type IShimDeltaHandler } from './types.js'; import { attributesMatch, isBarrierOp, isStampedOp } from './utils.js'; @@ -100,6 +101,13 @@ export class MigrationShimDeltaHandler implements IShimDeltaHandler { return this.treeDeltaHandler.process(message, local, localOpMetadata); } + public processMessages(messagesCollection: IRuntimeMessageCollection): void { + const { envelope, messagesContent, local } = messagesCollection; + for (const { contents, localOpMetadata, clientSequenceNumber } of messagesContent) { + this.process({ ...envelope, contents, clientSequenceNumber }, local, localOpMetadata); + } + } + public setConnectionState(connected: boolean): void { return this.treeDeltaHandler.setConnectionState(connected); } diff --git a/experimental/dds/tree/src/migration-shim/sharedTreeDeltaHandler.ts b/experimental/dds/tree/src/migration-shim/sharedTreeDeltaHandler.ts index 269d72ad6d25..a823f99f6cde 100644 --- a/experimental/dds/tree/src/migration-shim/sharedTreeDeltaHandler.ts +++ b/experimental/dds/tree/src/migration-shim/sharedTreeDeltaHandler.ts @@ -6,6 +6,7 @@ import { assert } from '@fluidframework/core-utils/internal'; import { type IChannelAttributes, type IDeltaHandler } from '@fluidframework/datastore-definitions/internal'; import { MessageType, type ISequencedDocumentMessage } from '@fluidframework/driver-definitions/internal'; +import type { IRuntimeMessageCollection } from '@fluidframework/runtime-definitions/internal'; import { type IOpContents, type IShimDeltaHandler } from './types.js'; import { attributesMatch, isStampedOp } from './utils.js'; @@ -62,6 +63,13 @@ export class SharedTreeShimDeltaHandler implements IShimDeltaHandler { return this.handler.process(message, local, localOpMetadata); } + public processMessages(messagesCollection: IRuntimeMessageCollection): void { + const { envelope, messagesContent, local } = messagesCollection; + for (const { contents, localOpMetadata, clientSequenceNumber } of messagesContent) { + this.process({ ...envelope, contents, clientSequenceNumber }, local, localOpMetadata); + } + } + // No idea whether any of the below 4 methods work as expected public setConnectionState(connected: boolean): void { return this.handler.setConnectionState(connected); diff --git a/packages/dds/shared-object-base/src/sharedObject.ts b/packages/dds/shared-object-base/src/sharedObject.ts index 0d7d12c93344..86ceb2e62be6 100644 --- a/packages/dds/shared-object-base/src/sharedObject.ts +++ b/packages/dds/shared-object-base/src/sharedObject.ts @@ -19,6 +19,7 @@ import { IChannelAttributes, type IChannelFactory, IFluidDataStoreRuntime, + type IDeltaHandler, } from "@fluidframework/datastore-definitions/internal"; import { type IDocumentMessage, @@ -31,6 +32,7 @@ import { IGarbageCollectionData, blobCountPropertyName, totalBlobSizePropertyName, + type IRuntimeMessageCollection, } from "@fluidframework/runtime-definitions/internal"; import { toDeltaManagerInternal, @@ -496,6 +498,9 @@ export abstract class SharedObjectCore< localOpMetadata, ); }, + processMessages: (messagesCollection: IRuntimeMessageCollection) => { + this.processMessages(messagesCollection); + }, setConnectionState: (connected: boolean) => { this.setConnectionState(connected); }, @@ -508,7 +513,7 @@ export abstract class SharedObjectCore< rollback: (content: any, localOpMetadata: unknown) => { this.rollback(content, localOpMetadata); }, - }); + } satisfies IDeltaHandler); } /** @@ -572,6 +577,25 @@ export abstract class SharedObjectCore< this.emitInternal("op", message, local, this); } + /** + * Process messages for this shared object. The messages here are contiguous messages for this object in a batch. + * @param messageCollection - The collection of messages to process. + */ + private processMessages(messagesCollection: IRuntimeMessageCollection) { + const { envelope, messagesContent, local } = messagesCollection; + for (const { contents, localOpMetadata, clientSequenceNumber } of messagesContent) { + this.process( + { + ...envelope, + clientSequenceNumber, + contents: parseHandles(contents, this.serializer), + }, + local, + localOpMetadata, + ); + } + } + /** * Called when a message has to be resubmitted. This typically happens for unacked messages after a * reconnection. diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md index 486aeb0f00d1..c7f4d5ef2efd 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md @@ -58,7 +58,9 @@ export interface IDeltaConnection { // @alpha export interface IDeltaHandler { applyStashedOp(message: any): void; + // @deprecated process: (message: ISequencedDocumentMessage, local: boolean, localOpMetadata: unknown) => void; + processMessages?: (messageCollection: IRuntimeMessageCollection) => void; reSubmit(message: any, localOpMetadata: unknown): void; rollback?(message: any, localOpMetadata: unknown): void; setConnectionState(connected: boolean): void; diff --git a/packages/runtime/datastore-definitions/src/channel.ts b/packages/runtime/datastore-definitions/src/channel.ts index 85b01e3558f5..861790d4329b 100644 --- a/packages/runtime/datastore-definitions/src/channel.ts +++ b/packages/runtime/datastore-definitions/src/channel.ts @@ -8,6 +8,7 @@ import type { ISequencedDocumentMessage } from "@fluidframework/driver-definitio import type { IExperimentalIncrementalSummaryContext, IGarbageCollectionData, + IRuntimeMessageCollection, ISummaryTreeWithStats, ITelemetryContext, } from "@fluidframework/runtime-definitions/internal"; @@ -130,6 +131,7 @@ export interface IDeltaHandler { * @param local - Whether the message originated from the local client * @param localOpMetadata - For local client messages, this is the metadata that was submitted with the message. * For messages from a remote client, this will be undefined. + * @deprecated - Use processMessages instead to process messages. */ process: ( message: ISequencedDocumentMessage, @@ -137,6 +139,12 @@ export interface IDeltaHandler { localOpMetadata: unknown, ) => void; + /** + * Process messages for this channel. The messages here are contiguous messages for this channel in a batch. + * @param messageCollection - The collection of messages to process. + */ + processMessages?: (messageCollection: IRuntimeMessageCollection) => void; + /** * State change events to indicate changes to the delta connection * @param connected - true if connected, false otherwise diff --git a/packages/runtime/datastore/src/channelContext.ts b/packages/runtime/datastore/src/channelContext.ts index 4cb05963844f..d5af7b646d8d 100644 --- a/packages/runtime/datastore/src/channelContext.ts +++ b/packages/runtime/datastore/src/channelContext.ts @@ -12,7 +12,6 @@ import { import { IDocumentStorageService, ISnapshotTree, - ISequencedDocumentMessage, } from "@fluidframework/driver-definitions/internal"; import { readAndParse } from "@fluidframework/driver-utils/internal"; import { @@ -22,6 +21,7 @@ import { IGarbageCollectionData, IFluidDataStoreContext, ISummarizeResult, + type IRuntimeMessageCollection, } from "@fluidframework/runtime-definitions/internal"; import { addBlobToSummary } from "@fluidframework/runtime-utils/internal"; import { @@ -41,11 +41,11 @@ export interface IChannelContext { setConnectionState(connected: boolean, clientId?: string); - processOp( - message: ISequencedDocumentMessage, - local: boolean, - localOpMetadata?: unknown, - ): void; + /** + * Process messages for this channel context. The messages here are contiguous messages for this context in a batch. + * @param messageCollection - The collection of messages to process. + */ + processMessages(messageCollection: IRuntimeMessageCollection): void; summarize( fullTree?: boolean, diff --git a/packages/runtime/datastore/src/channelDeltaConnection.ts b/packages/runtime/datastore/src/channelDeltaConnection.ts index f6e54f4ea518..c731b9ba720d 100644 --- a/packages/runtime/datastore/src/channelDeltaConnection.ts +++ b/packages/runtime/datastore/src/channelDeltaConnection.ts @@ -8,7 +8,11 @@ import { IDeltaConnection, IDeltaHandler, } from "@fluidframework/datastore-definitions/internal"; -import { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal"; +import type { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal"; +import type { + IRuntimeMessageCollection, + IRuntimeMessagesContent, +} from "@fluidframework/runtime-definitions/internal"; import { DataProcessingError } from "@fluidframework/telemetry-utils/internal"; const stashedOpMetadataMark = Symbol(); @@ -46,6 +50,26 @@ function processWithStashedOpMetadataHandling( } } +function getContentsWithStashedOpHandling( + messagesContent: readonly IRuntimeMessagesContent[], +) { + const newMessageContents: IRuntimeMessagesContent[] = []; + for (const messageContent of messagesContent) { + if (isStashedOpMetadata(messageContent.localOpMetadata)) { + messageContent.localOpMetadata.forEach(({ contents, metadata }) => { + newMessageContents.push({ + contents, + localOpMetadata: metadata, + clientSequenceNumber: messageContent.clientSequenceNumber, + }); + }); + } else { + newMessageContents.push(messageContent); + } + } + return newMessageContents; +} + export class ChannelDeltaConnection implements IDeltaConnection { private _handler: IDeltaHandler | undefined; private stashedOpMd: StashedOpMetadata | undefined; @@ -75,24 +99,31 @@ export class ChannelDeltaConnection implements IDeltaConnection { this.handler.setConnectionState(connected); } - public process( - message: ISequencedDocumentMessage, - local: boolean, - localOpMetadata: unknown, - ) { + public processMessages(messageCollection: IRuntimeMessageCollection): void { + const { envelope, messagesContent, local } = messageCollection; + // catches as data processing error whether or not they come from async pending queues try { - // catches as data processing error whether or not they come from async pending queues - processWithStashedOpMetadataHandling( - message.contents, - localOpMetadata, - (contents, metadata) => - this.handler.process({ ...message, contents }, local, metadata), - ); + const newMessagesContent = getContentsWithStashedOpHandling(messagesContent); + if (this.handler.processMessages !== undefined) { + this.handler.processMessages({ + ...messageCollection, + messagesContent: newMessagesContent, + }); + } else { + for (const { contents, localOpMetadata, clientSequenceNumber } of newMessagesContent) { + const compatMessage: ISequencedDocumentMessage = { + ...envelope, + contents, + clientSequenceNumber, + }; + this.handler.process(compatMessage, local, localOpMetadata); + } + } } catch (error) { throw DataProcessingError.wrapIfUnrecognized( error, - "channelDeltaConnectionFailedToProcessMessage", - message, + "channelDeltaConnectionFailedToProcessMessages", + envelope, ); } } diff --git a/packages/runtime/datastore/src/dataStoreRuntime.ts b/packages/runtime/datastore/src/dataStoreRuntime.ts index f42c2c580c3f..b8fcddabf635 100644 --- a/packages/runtime/datastore/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore/src/dataStoreRuntime.ts @@ -691,18 +691,12 @@ export class FluidDataStoreRuntime const channelContext = this.contexts.get(currentAddress); assert(!!channelContext, "Channel context not found"); - // For now, send the message to be processed one by one. This will be updated to send the bunch later. - for (const { - contents, - clientSequenceNumber, - localOpMetadata, - } of currentMessagesContent) { - channelContext.processOp( - { ...messageCollection.envelope, contents, clientSequenceNumber }, - local, - localOpMetadata, - ); - } + channelContext.processMessages({ + envelope: messageCollection.envelope, + messagesContent: currentMessagesContent, + local, + }); + currentMessagesContent = []; }; diff --git a/packages/runtime/datastore/src/localChannelContext.ts b/packages/runtime/datastore/src/localChannelContext.ts index c1419124ed3f..377592806df7 100644 --- a/packages/runtime/datastore/src/localChannelContext.ts +++ b/packages/runtime/datastore/src/localChannelContext.ts @@ -12,13 +12,14 @@ import { import { IDocumentStorageService, ISnapshotTree, - ISequencedDocumentMessage, } from "@fluidframework/driver-definitions/internal"; import { ITelemetryContext, IFluidDataStoreContext, IGarbageCollectionData, ISummarizeResult, + type IPendingMessagesState, + type IRuntimeMessageCollection, } from "@fluidframework/runtime-definitions/internal"; import { ITelemetryLoggerExt, @@ -41,7 +42,11 @@ import { ISharedObjectRegistry } from "./dataStoreRuntime.js"; */ export abstract class LocalChannelContextBase implements IChannelContext { private globallyVisible = false; - protected readonly pending: ISequencedDocumentMessage[] = []; + /** Tracks the messages for this channel that are sent while it's not loaded */ + protected pendingMessagesState: IPendingMessagesState = { + messageCollections: [], + pendingCount: 0, + }; constructor( protected readonly id: string, protected readonly runtime: IFluidDataStoreRuntime, @@ -74,11 +79,11 @@ export abstract class LocalChannelContextBase implements IChannelContext { } } - public processOp( - message: ISequencedDocumentMessage, - local: boolean, - localOpMetadata: unknown, - ): void { + /** + * Process messages for this channel context. The messages here are contiguous messages for this context in a batch. + * @param messageCollection - The collection of messages to process. + */ + processMessages(messageCollection: IRuntimeMessageCollection): void { assert( this.globallyVisible, 0x2d3 /* "Local channel must be globally visible when processing op" */, @@ -88,13 +93,17 @@ export abstract class LocalChannelContextBase implements IChannelContext { // delay loading. So after the container is attached and some other client joins which start generating // ops for this channel. So not loaded local channel can still receive ops and we store them to process later. if (this.isLoaded) { - this.services.value.deltaConnection.process(message, local, localOpMetadata); + this.services.value.deltaConnection.processMessages(messageCollection); } else { assert( - local === false, + !messageCollection.local, 0x189 /* "Should always be remote because a local dds shouldn't generate ops before loading" */, ); - this.pending.push(message); + const propsCopy = { + ...messageCollection, + messagesContent: Array.from(messageCollection.messagesContent), + }; + this.pendingMessagesState.messageCollections.push(propsCopy); } } @@ -254,12 +263,8 @@ export class RehydratedLocalChannelContext extends LocalChannelContextBase { this.id, ); // Send all pending messages to the channel - for (const message of this.pending) { - this.services.value.deltaConnection.process( - message, - false, - undefined /* localOpMetadata */, - ); + for (const messageCollection of this.pendingMessagesState.messageCollections) { + this.services.value.deltaConnection.processMessages(messageCollection); } return channel; } catch (err) { diff --git a/packages/runtime/datastore/src/remoteChannelContext.ts b/packages/runtime/datastore/src/remoteChannelContext.ts index 5fc85c4e0ae1..7f97c9aa2399 100644 --- a/packages/runtime/datastore/src/remoteChannelContext.ts +++ b/packages/runtime/datastore/src/remoteChannelContext.ts @@ -12,7 +12,6 @@ import { import { IDocumentStorageService, ISnapshotTree, - ISequencedDocumentMessage, } from "@fluidframework/driver-definitions/internal"; import { IExperimentalIncrementalSummaryContext, @@ -23,6 +22,8 @@ import { ISummarizeInternalResult, ISummarizeResult, ISummarizerNodeWithGC, + type IPendingMessagesState, + type IRuntimeMessageCollection, } from "@fluidframework/runtime-definitions/internal"; import { ITelemetryLoggerExt, @@ -42,7 +43,11 @@ import { ISharedObjectRegistry } from "./dataStoreRuntime.js"; export class RemoteChannelContext implements IChannelContext { private isLoaded = false; - private pending: ISequencedDocumentMessage[] | undefined = []; + /** Tracks the messages for this channel that are sent while it's not loaded */ + private pendingMessagesState: IPendingMessagesState | undefined = { + messageCollections: [], + pendingCount: 0, + }; private readonly channelP: Promise; private channel: IChannel | undefined; private readonly services: ChannelServiceEndpoints; @@ -100,16 +105,18 @@ export class RemoteChannelContext implements IChannelContext { this.id, ); - // Send all pending messages to the channel - assert(this.pending !== undefined, 0x23f /* "pending undefined" */); - for (const message of this.pending) { - this.services.deltaConnection.process(message, false, undefined /* localOpMetadata */); + assert(this.pendingMessagesState !== undefined, "pending messages state is undefined"); + for (const messageCollection of this.pendingMessagesState.messageCollections) { + this.services.deltaConnection.processMessages(messageCollection); } - this.thresholdOpsCounter.send("ProcessPendingOps", this.pending.length); + this.thresholdOpsCounter.send( + "ProcessPendingOps", + this.pendingMessagesState.pendingCount, + ); // Commit changes. this.channel = channel; - this.pending = undefined; + this.pendingMessagesState = undefined; this.isLoaded = true; // Because have some await between we created the service and here, the connection state might have changed @@ -161,20 +168,28 @@ export class RemoteChannelContext implements IChannelContext { return this.services.deltaConnection.applyStashedOp(content); } - public processOp( - message: ISequencedDocumentMessage, - local: boolean, - localOpMetadata: unknown, - ): void { - this.summarizerNode.invalidate(message.sequenceNumber); + /** + * Process messages for this channel context. The messages here are contiguous messages for this context in a batch. + * @param messageCollection - The collection of messages to process. + */ + public processMessages(messageCollection: IRuntimeMessageCollection): void { + const { envelope, messagesContent, local } = messageCollection; + this.summarizerNode.invalidate(envelope.sequenceNumber); if (this.isLoaded) { - this.services.deltaConnection.process(message, local, localOpMetadata); + this.services.deltaConnection.processMessages(messageCollection); } else { assert(!local, 0x195 /* "Remote channel must not be local when processing op" */); - assert(this.pending !== undefined, 0x23e /* "pending is undefined" */); - this.pending.push(message); - this.thresholdOpsCounter.sendIfMultiple("StorePendingOps", this.pending.length); + assert(this.pendingMessagesState !== undefined, "pending messages queue is undefined"); + this.pendingMessagesState.messageCollections.push({ + ...messageCollection, + messagesContent: Array.from(messagesContent), + }); + this.pendingMessagesState.pendingCount += messagesContent.length; + this.thresholdOpsCounter.sendIfMultiple( + "StorePendingOps", + this.pendingMessagesState.pendingCount, + ); } } diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md index 0b905484ed70..7f570d37f6ed 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md @@ -273,18 +273,18 @@ export interface IProvideFluidDataStoreRegistry { readonly IFluidDataStoreRegistry: IFluidDataStoreRegistry; } -// @alpha +// @alpha @sealed export interface IRuntimeMessageCollection { - envelope: ISequencedMessageEnvelope; - local: boolean; - messagesContent: IRuntimeMessagesContent[]; + readonly envelope: ISequencedMessageEnvelope; + readonly local: boolean; + readonly messagesContent: readonly IRuntimeMessagesContent[]; } -// @alpha +// @alpha @sealed export interface IRuntimeMessagesContent { - clientSequenceNumber: number; - contents: unknown; - localOpMetadata: unknown; + readonly clientSequenceNumber: number; + readonly contents: unknown; + readonly localOpMetadata: unknown; } // @alpha diff --git a/packages/runtime/runtime-definitions/src/protocol.ts b/packages/runtime/runtime-definitions/src/protocol.ts index 2f77bb42d725..1b83eedfb2d6 100644 --- a/packages/runtime/runtime-definitions/src/protocol.ts +++ b/packages/runtime/runtime-definitions/src/protocol.ts @@ -87,26 +87,28 @@ export type ISequencedMessageEnvelope = Omit< * These are the contents of a runtime message as it is processed throughout the stack. * @alpha * @legacy + * @sealed */ export interface IRuntimeMessagesContent { /** The contents of the message, i.e., the payload */ - contents: unknown; + readonly contents: unknown; /** The local metadata associated with the original message that was submitted */ - localOpMetadata: unknown; + readonly localOpMetadata: unknown; /** The client sequence number of the message */ - clientSequenceNumber: number; + readonly clientSequenceNumber: number; } /** * A collection of messages that are processed by the runtime. * @alpha * @legacy + * @sealed */ export interface IRuntimeMessageCollection { /** The envelope for all the messages in the collection */ - envelope: ISequencedMessageEnvelope; + readonly envelope: ISequencedMessageEnvelope; /** Whether these messages were originally generated by the client processing it */ - local: boolean; + readonly local: boolean; /** The contents of the messages in the collection */ - messagesContent: IRuntimeMessagesContent[]; + readonly messagesContent: readonly IRuntimeMessagesContent[]; } diff --git a/packages/runtime/test-runtime-utils/api-report/test-runtime-utils.legacy.alpha.api.md b/packages/runtime/test-runtime-utils/api-report/test-runtime-utils.legacy.alpha.api.md index a073484a4758..f80f0eecc66d 100644 --- a/packages/runtime/test-runtime-utils/api-report/test-runtime-utils.legacy.alpha.api.md +++ b/packages/runtime/test-runtime-utils/api-report/test-runtime-utils.legacy.alpha.api.md @@ -171,9 +171,11 @@ export class MockDeltaConnection implements IDeltaConnection { dirty(): void; // (undocumented) handler: IDeltaHandler | undefined; - // (undocumented) + // @deprecated (undocumented) process(message: ISequencedDocumentMessage, local: boolean, localOpMetadata: unknown): void; // (undocumented) + processMessages(messageCollection: IRuntimeMessageCollection): void; + // (undocumented) reSubmit(content: any, localOpMetadata: unknown): void; // (undocumented) setConnectionState(connected: boolean): void; diff --git a/packages/runtime/test-runtime-utils/package.json b/packages/runtime/test-runtime-utils/package.json index 8427c6a7409b..e1ec585d3b1e 100644 --- a/packages/runtime/test-runtime-utils/package.json +++ b/packages/runtime/test-runtime-utils/package.json @@ -158,6 +158,9 @@ }, "typeValidation": { "broken": { + "Class_MockDeltaConnection": { + "forwardCompat": false + }, "Class_MockFluidDataStoreRuntime": { "forwardCompat": false } diff --git a/packages/runtime/test-runtime-utils/src/mocks.ts b/packages/runtime/test-runtime-utils/src/mocks.ts index 6ee487eb2fdc..5f1f03590439 100644 --- a/packages/runtime/test-runtime-utils/src/mocks.ts +++ b/packages/runtime/test-runtime-utils/src/mocks.ts @@ -109,6 +109,9 @@ export class MockDeltaConnection implements IDeltaConnection { this.handler?.setConnectionState(connected); } + /** + * @deprecated - This has been replaced by processMessages + */ public process( message: ISequencedDocumentMessage, local: boolean, @@ -117,6 +120,10 @@ export class MockDeltaConnection implements IDeltaConnection { this.handler?.process(message, local, localOpMetadata); } + public processMessages(messageCollection: IRuntimeMessageCollection) { + this.handler?.processMessages?.(messageCollection); + } + public reSubmit(content: any, localOpMetadata: unknown) { this.handler?.reSubmit(content, localOpMetadata); } diff --git a/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts b/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts index 0bda1af63e27..2da5e35506b3 100644 --- a/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts +++ b/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts @@ -112,6 +112,7 @@ declare type current_as_old_for_Class_MockContainerRuntimeForReconnection = requ * typeValidation.broken: * "Class_MockDeltaConnection": {"forwardCompat": false} */ +// @ts-expect-error compatibility expected to be broken declare type old_as_current_for_Class_MockDeltaConnection = requireAssignableTo, TypeOnly> /* diff --git a/packages/test/test-end-to-end-tests/src/test/opBunching.spec.ts b/packages/test/test-end-to-end-tests/src/test/opBunching.spec.ts new file mode 100644 index 000000000000..4a3674705cae --- /dev/null +++ b/packages/test/test-end-to-end-tests/src/test/opBunching.spec.ts @@ -0,0 +1,287 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; + +import { stringToBuffer } from "@fluid-internal/client-utils"; +import { describeCompat } from "@fluid-private/test-version-utils"; +import type { IFluidHandle } from "@fluidframework/core-interfaces"; +import type { ISharedDirectory, ISharedMap } from "@fluidframework/map/internal"; +import type { IRuntimeMessageCollection } from "@fluidframework/runtime-definitions/internal"; +import { SharedObject } from "@fluidframework/shared-object-base/internal"; +import { ITestObjectProvider } from "@fluidframework/test-utils/internal"; +import { createSandbox } from "sinon"; + +describeCompat( + "Ops for DDSes are bunched together", + "NoCompat", + (getTestObjectProvider, apis) => { + const { DataObject, DataObjectFactory } = apis.dataRuntime; + const { ContainerRuntimeFactoryWithDefaultDataStore } = apis.containerRuntime; + const { SharedMap } = apis.dds; + + class TestDataObject extends DataObject { + public get _context() { + return this.context; + } + public get _runtime() { + return this.runtime; + } + public get _root() { + return this.root; + } + } + + const dataObjectFactory = new DataObjectFactory( + "testDataObject", + TestDataObject, + [SharedMap.getFactory()], + undefined, + ); + const runtimeFactory = new ContainerRuntimeFactoryWithDefaultDataStore({ + defaultFactory: dataObjectFactory, + registryEntries: [["testDataObject", Promise.resolve(dataObjectFactory)]], + }); + + type SharedObjectWithProcess = Omit & { + processMessages(messageCollection: IRuntimeMessageCollection): void; + }; + + let sandbox: sinon.SinonSandbox; + let provider: ITestObjectProvider; + let rootDataObject: TestDataObject; + let dds1: ISharedDirectory; + let dds2: ISharedMap; + let ds2dds1: ISharedDirectory; + let dds1Container2Stub: sinon.SinonStub; + let dds2Container2Stub: sinon.SinonStub; + let ds2dds1Container2Stub: sinon.SinonStub; + + beforeEach("getTestObjectProvider", async () => { + sandbox = createSandbox(); + provider = getTestObjectProvider(); + + const container = await provider.createContainer(runtimeFactory); + rootDataObject = (await container.getEntryPoint()) as TestDataObject; + dds1 = rootDataObject._root; + dds2 = SharedMap.create(rootDataObject._runtime); + dds1.set("map", dds2.handle); + + const ds2 = await dataObjectFactory.createInstance( + rootDataObject._context.containerRuntime, + ); + dds1.set("dataStore2", ds2.handle); + + const container2 = await provider.loadContainer(runtimeFactory); + const rootObject2 = (await container2.getEntryPoint()) as TestDataObject; + const dds1Container2 = rootObject2._root as unknown as SharedObjectWithProcess; + + await provider.ensureSynchronized(); + const dds2Handle = rootObject2._root.get>("map"); + assert(dds2Handle !== undefined, "shared map handle not found"); + const dds2Container2 = await dds2Handle.get(); + + const ds2Container2Handle = + rootObject2._root.get>("dataStore2"); + assert(ds2Container2Handle !== undefined, "data store 2 handle not found"); + const ds2Container2 = await ds2Container2Handle.get(); + const ds2dds1Container2 = ds2Container2._root as unknown as SharedObjectWithProcess; + ds2dds1 = ds2Container2._root; + + dds1Container2Stub = sandbox.stub(dds1Container2, "processMessages"); + dds2Container2Stub = sandbox.stub(dds2Container2, "processMessages"); + ds2dds1Container2Stub = sandbox.stub(ds2dds1Container2, "processMessages"); + }); + + afterEach(() => { + sandbox.restore(); + }); + + it("ops for a single DDS", async () => { + // Send a bunch of ops for dds2. + const bunchCount = 5; + for (let i = 0; i < bunchCount; i++) { + dds2.set(i.toString(), i); + } + + // Send another bunch of ops for dds2 without interleaving. + for (let i = bunchCount; i < 2 * bunchCount; i++) { + dds2.set(i.toString(), i); + } + + await provider.ensureSynchronized(); + + // Validate that processMessages is called once with all ops. + assert(dds2Container2Stub.calledOnce, "processMessages should be called once"); + const messageCollection = dds2Container2Stub.args[0][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection.messagesContent.length, + 2 * bunchCount, + "All the ops for dds2 should be processed together", + ); + }); + + it("ops across two DDSes interleaved", async () => { + // Send first bunch of ops for dds2. + const bunch1dds2Count = 5; + for (let i = 0; i < bunch1dds2Count; i++) { + dds2.set(i.toString(), i); + } + + // Send second bunch of ops for dds1. + const bunch2dds1Count = 10; + for (let i = 0; i < bunch2dds1Count; i++) { + dds1.set(i.toString(), i); + } + + // Send third bunch of ops for dds2. + const bunch3dds2Count = 5; + for (let i = 0; i < bunch3dds2Count; i++) { + dds2.set(i.toString(), i); + } + + await provider.ensureSynchronized(); + + assert(dds1Container2Stub.calledOnce, "processMessages should be called once on dds1"); + assert(dds2Container2Stub.calledTwice, "processMessages should be called twice on dds2"); + + const messageCollection1 = dds2Container2Stub.args[0][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection1.messagesContent.length, + bunch1dds2Count, + "First bunch of ops for dds2 should be processed together", + ); + + const messageCollection2 = dds1Container2Stub.args[0][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection2.messagesContent.length, + bunch2dds1Count, + "First bunch of ops for dds1 should be processed together", + ); + + const messageCollection3 = dds2Container2Stub.args[1][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection3.messagesContent.length, + bunch3dds2Count, + "Second bunch of ops for dds2 should be processed together", + ); + }); + + it("ops across two data store interleaved", async () => { + // Send first bunch of ops for dds1 in data store 1. + const bunch1dds1Count = 5; + for (let i = 0; i < bunch1dds1Count; i++) { + dds1.set(i.toString(), i); + } + + // Send second bunch of ops for dds1 in data store 2. + const bunch2ds2dds1Count = 10; + for (let i = 0; i < bunch2ds2dds1Count; i++) { + ds2dds1.set(i.toString(), i); + } + + // Send third bunch of ops for dds2 in data store 1. + const bunch3dds2Count = 15; + for (let i = 0; i < bunch3dds2Count; i++) { + dds2.set(i.toString(), i); + } + + await provider.ensureSynchronized(); + + assert(dds1Container2Stub.calledOnce, "processMessages should be called once on dds1"); + assert(dds2Container2Stub.calledOnce, "processMessages should be called once on dds2"); + assert( + ds2dds1Container2Stub.calledOnce, + "processMessages should be called once on ds2's dds1", + ); + + const messageCollection1 = dds1Container2Stub.args[0][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection1.messagesContent.length, + bunch1dds1Count, + "First bunch of ops for dds2 should be processed together", + ); + + const messageCollection2 = ds2dds1Container2Stub.args[0][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection2.messagesContent.length, + bunch2ds2dds1Count, + "First bunch of ops for dds1 should be processed together", + ); + + const messageCollection3 = dds2Container2Stub.args[0][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection3.messagesContent.length, + bunch3dds2Count, + "Second bunch of ops for dds2 should be processed together", + ); + }); + + it("ops for DDS and other types interleaved", async () => { + // Send first bunch of ops for dds1. + const bunch1dds1Count = 5; + for (let i = 0; i < bunch1dds1Count; i++) { + dds1.set(i.toString(), i); + } + + // Send an attach op. This will send an attach op for the new data store + an + // op for dds1 after that for setting the handle. + const ds3 = await dataObjectFactory.createInstance( + rootDataObject._context.containerRuntime, + ); + dds1.set("dataStore3", ds3.handle); + + // Send second bunch of ops for dds1. Send 1 less than bunch2dds1Count because one op + // is already sent above. + const bunch2dds1Count = 10; + for (let i = 0; i < bunch2dds1Count - 1; i++) { + dds1.set(i.toString(), i); + } + + // Send a blob attach op. This will send an attach op for the new data store + an + // op for dds1 after that for setting the handle. + const blobContents = "Blob contents"; + const blobHandle = await rootDataObject._context.uploadBlob( + stringToBuffer(blobContents, "utf-8"), + ); + dds1.set("blob", blobHandle); + + // Send third bunch of ops for dds1. Send 1 less than bunch3dds1Count because one op + // is already sent above. + const bunch3dds1Count = 15; + for (let i = 0; i < bunch3dds1Count - 1; i++) { + dds1.set(i.toString(), i); + } + + await provider.ensureSynchronized(); + + assert( + dds1Container2Stub.calledThrice, + "processMessages should be called thrice on dds1", + ); + + const messageCollection1 = dds1Container2Stub.args[0][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection1.messagesContent.length, + bunch1dds1Count, + "First bunch of ops for dds1 should be processed together", + ); + + const messageCollection2 = dds1Container2Stub.args[1][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection2.messagesContent.length, + bunch2dds1Count, + "Second bunch of ops for dds1 should be processed together", + ); + + const messageCollection3 = dds1Container2Stub.args[2][0] as IRuntimeMessageCollection; + assert.strictEqual( + messageCollection3.messagesContent.length, + bunch3dds1Count, + "Second bunch of ops for dds1 should be processed together", + ); + }); + }, +); diff --git a/packages/tools/replay-tool/src/unknownChannel.ts b/packages/tools/replay-tool/src/unknownChannel.ts index 9ea9704b54c7..41b0313a3db2 100644 --- a/packages/tools/replay-tool/src/unknownChannel.ts +++ b/packages/tools/replay-tool/src/unknownChannel.ts @@ -22,6 +22,7 @@ import { IGarbageCollectionData, ISummaryTreeWithStats, IFluidDataStoreContext, + type IRuntimeMessageCollection, } from "@fluidframework/runtime-definitions/internal"; class UnknownChannel implements IChannel { @@ -36,6 +37,7 @@ class UnknownChannel implements IChannel { local: boolean, localOpMetadata: unknown, ) => {}, + processMessages: (messageCollection: IRuntimeMessageCollection) => {}, setConnectionState: (connected: boolean) => {}, reSubmit: (content: any, localOpMetadata: unknown) => {}, applyStashedOp: (content: any) => {},