Skip to content

Commit

Permalink
feat(NODE-6633): MongoClient.close closes active cursors (#4372)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored Jan 27, 2025
1 parent 73def18 commit 654069f
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 25 deletions.
49 changes: 31 additions & 18 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ export interface CursorStreamOptions {
/** @public */
export type CursorFlag = (typeof CURSOR_FLAGS)[number];

function removeActiveCursor(this: AbstractCursor) {
this.client.s.activeCursors.delete(this);
}

/**
* @public
* @experimental
Expand Down Expand Up @@ -365,6 +369,7 @@ export abstract class AbstractCursor<
this.signal,
() => void this.close().then(undefined, squashError)
);
this.trackCursor();
}

/**
Expand Down Expand Up @@ -444,6 +449,14 @@ export abstract class AbstractCursor<
await this.close();
}

/** Adds cursor to client's tracking so it will be closed by MongoClient.close() */
private trackCursor() {
this.cursorClient.s.activeCursors.add(this);
if (!this.listeners('close').includes(removeActiveCursor)) {
this.once('close', removeActiveCursor);
}
}

/** Returns current buffered documents length */
bufferedCount(): number {
return this.documents?.length ?? 0;
Expand Down Expand Up @@ -858,16 +871,15 @@ export abstract class AbstractCursor<
this.isClosed = false;
this.isKilled = false;
this.initialized = false;
this.hasEmittedClose = false;
this.trackCursor();

const session = this.cursorSession;
if (session) {
// We only want to end this session if we created it, and it hasn't ended yet
if (session.explicit === false) {
if (!session.hasEnded) {
session.endSession().then(undefined, squashError);
}
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
// We only want to end this session if we created it, and it hasn't ended yet
if (this.cursorSession.explicit === false) {
if (!this.cursorSession.hasEnded) {
this.cursorSession.endSession().then(undefined, squashError);
}
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
}
}

Expand Down Expand Up @@ -1004,7 +1016,6 @@ export abstract class AbstractCursor<
private async cleanup(timeoutMS?: number, error?: Error) {
this.abortListener?.[kDispose]();
this.isClosed = true;
const session = this.cursorSession;
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
if (timeoutMS != null) {
this.timeoutContext?.clear();
Expand All @@ -1026,7 +1037,7 @@ export abstract class AbstractCursor<
!this.cursorId.isZero() &&
this.cursorNamespace &&
this.selectedServer &&
!session.hasEnded
!this.cursorSession.hasEnded
) {
this.isKilled = true;
const cursorId = this.cursorId;
Expand All @@ -1035,22 +1046,24 @@ export abstract class AbstractCursor<
await executeOperation(
this.cursorClient,
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session
session: this.cursorSession
}),
timeoutContextForKillCursors()
);
}
} catch (error) {
squashError(error);
} finally {
if (session?.owner === this) {
await session.endSession({ error });
}
if (!session?.inTransaction()) {
maybeClearPinnedConnection(session, { error });
try {
if (this.cursorSession?.owner === this) {
await this.cursorSession.endSession({ error });
}
if (!this.cursorSession?.inTransaction()) {
maybeClearPinnedConnection(this.cursorSession, { error });
}
} finally {
this.emitClose();
}

this.emitClose();
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type { ClientMetadata } from './cmap/handshake/client_metadata';
import type { CompressorName } from './cmap/wire_protocol/compression';
import { parseOptions, resolveSRVRecord } from './connection_string';
import { MONGO_CLIENT_EVENTS } from './constants';
import { type AbstractCursor } from './cursor/abstract_cursor';
import { Db, type DbOptions } from './db';
import type { Encrypter } from './encrypter';
import { MongoInvalidArgumentError } from './error';
Expand Down Expand Up @@ -323,6 +324,12 @@ export interface MongoClientPrivate {
* - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions
*/
readonly activeSessions: Set<ClientSession>;
/**
* We keep a reference to the cursors that are created from this client.
* - used to track and close all cursors in client.close().
* Cursors in this set are ones that still need to have their close method invoked (no other conditions are considered)
*/
readonly activeCursors: Set<AbstractCursor>;
readonly sessionPool: ServerSessionPool;
readonly options: MongoOptions;
readonly readConcern?: ReadConcern;
Expand Down Expand Up @@ -398,6 +405,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
hasBeenClosed: false,
sessionPool: new ServerSessionPool(this),
activeSessions: new Set(),
activeCursors: new Set(),
authProviders: new MongoClientAuthProviders(),

get options() {
Expand Down Expand Up @@ -650,6 +658,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
writable: false
});

const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
this.s.activeCursors.clear();

await Promise.all(activeCursorCloses);

const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession());
this.s.activeSessions.clear();

Expand Down
21 changes: 20 additions & 1 deletion test/integration/crud/find_cursor_methods.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ describe('Find Cursor', function () {
});
});

context('#rewind', function () {
describe('#rewind', function () {
it('should rewind a cursor', async function () {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({});
Expand Down Expand Up @@ -335,6 +335,25 @@ describe('Find Cursor', function () {
});
}
});

it('emits close after rewind', async () => {
let cursor;
try {
const coll = client.db().collection('abstract_cursor');
cursor = coll.find({}, { batchSize: 1 });
const closes = [];
cursor.on('close', () => closes.push('close'));
const doc0 = await cursor.next();
await cursor.close();
cursor.rewind();
const doc1 = await cursor.next();
await cursor.close();
expect(doc0).to.deep.equal(doc1); // make sure rewind happened
expect(closes).to.have.lengthOf(2);
} finally {
await cursor.close();
}
});
});

context('#allowDiskUse', function () {
Expand Down
4 changes: 2 additions & 2 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const sinon = require('sinon');
const { Writable } = require('stream');
const { once, on } = require('events');
const { setTimeout } = require('timers');
const { ReadPreference, MongoExpiredSessionError } = require('../../mongodb');
const { ReadPreference } = require('../../mongodb');
const { ServerType } = require('../../mongodb');
const { formatSort } = require('../../mongodb');

Expand Down Expand Up @@ -1872,7 +1872,7 @@ describe('Cursor', function () {
expect(cursor).to.have.property('closed', true);

const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.instanceOf(MongoExpiredSessionError);
expect(error).to.be.null; // TODO(NODE-6632): This should throw again after the client signal aborts the in-progress next call
});

it('shouldAwaitData', {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/node-specific/abort_signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ describe('AbortSignal support', () => {
const start = performance.now();
const result = await cursor.toArray().catch(error => error);
const end = performance.now();
expect(end - start).to.be.lessThan(15);
expect(end - start).to.be.lessThan(50);

expect(result).to.be.instanceOf(DOMException);
});
Expand Down
43 changes: 43 additions & 0 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,4 +556,47 @@ describe('class AbstractCursor', function () {
);
});
});

describe('cursor tracking', () => {
let client: MongoClient;
let collection: Collection;

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('activeCursors').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, i) => ({ i })));
});

afterEach(async function () {
await client.close();
});

it('adds itself to a set upon construction', () => {
collection.find({}, { batchSize: 1 });
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('adds itself to a set upon rewind', async () => {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(client.s.activeCursors).to.have.lengthOf(1);
await cursor.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
cursor.rewind();
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('does not add more than one close listener', async () => {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(cursor.listeners('close')).to.have.lengthOf(1);
await cursor.close();
expect(cursor.listeners('close')).to.have.lengthOf(0);
cursor.rewind();
cursor.rewind();
cursor.rewind();
expect(cursor.listeners('close')).to.have.lengthOf(1);
});
});
});
86 changes: 83 additions & 3 deletions test/integration/node-specific/mongo_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as net from 'net';
import * as sinon from 'sinon';

import {
type Collection,
type CommandFailedEvent,
type CommandStartedEvent,
type CommandSucceededEvent,
Expand Down Expand Up @@ -31,7 +32,6 @@ describe('class MongoClient', function () {
afterEach(async () => {
sinon.restore();
await client?.close();
// @ts-expect-error: Put this variable back to undefined to force tests to make their own client
client = undefined;
});

Expand Down Expand Up @@ -567,7 +567,44 @@ describe('class MongoClient', function () {
});
});

context('#close()', () => {
describe('active cursors', function () {
let client: MongoClient;
let collection: Collection<{ _id: number }>;
const kills = [];

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('activeCursors').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));

kills.length = 0;
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
});

afterEach(async function () {
await client.close();
});

it('are tracked upon creation and removed upon exhaustion', async () => {
const cursors = Array.from({ length: 30 }, (_, skip) =>
collection.find({}, { skip, batchSize: 1 })
);
expect(client.s.activeCursors).to.have.lengthOf(30);
await Promise.all(cursors.map(c => c.toArray()));
expect(client.s.activeCursors).to.have.lengthOf(0);
expect(kills).to.have.lengthOf(0);
});

it('are removed from tracking if exhausted in first batch', async () => {
const cursors = Array.from({ length: 30 }, () => collection.find());
expect(client.s.activeCursors).to.have.lengthOf(30);
await Promise.all(cursors.map(c => c.next())); // only one document pulled from each.
expect(client.s.activeCursors).to.have.lengthOf(0);
});
});

describe('#close()', () => {
let client: MongoClient;
let db: Db;

Expand Down Expand Up @@ -702,7 +739,7 @@ describe('class MongoClient', function () {
expect(endEvents[0]).to.have.property('reply', undefined); // noReponse: true
});

context('when server selection would return no servers', () => {
describe('when server selection would return no servers', () => {
const serverDescription = new ServerDescription('a:1');

it('short circuits and does not end sessions', async () => {
Expand All @@ -722,6 +759,49 @@ describe('class MongoClient', function () {
expect(client.s.sessionPool.sessions).to.have.lengthOf(1);
});
});

describe('active cursors', function () {
let collection: Collection<{ _id: number }>;
const kills = [];

beforeEach(async () => {
collection = client.db('test').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));

kills.length = 0;
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
});

it('are all closed', async () => {
const cursors = Array.from({ length: 30 }, (_, skip) =>
collection.find({}, { skip, batchSize: 1 })
);
await Promise.all(cursors.map(c => c.next()));
expect(client.s.activeCursors).to.have.lengthOf(30);
await client.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
expect(kills).to.have.lengthOf(30);
});

it('creating cursors after close adds to activeCursors', async () => {
expect(client.s.activeCursors).to.have.lengthOf(0);
await client.close();
collection.find({});
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('rewinding cursors after close adds to activeCursors', async () => {
expect(client.s.activeCursors).to.have.lengthOf(0);
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(client.s.activeCursors).to.have.lengthOf(1);
await client.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
cursor.rewind();
expect(client.s.activeCursors).to.have.lengthOf(1);
});
});
});

context('when connecting', function () {
Expand Down

0 comments on commit 654069f

Please sign in to comment.