Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jan 31, 2025
1 parent 072beb0 commit 7f017b0
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 31 deletions.
4 changes: 4 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@
{
"selector": "CallExpression[callee.name='clearTimeout']",
"message": "clearTimeout must remove abort listener"
},
{
"selector": "CallExpression[callee.property.name='removeAllListeners'][arguments.length=0]",
"message": "removeAllListeners can remove error listeners leading to uncaught errors"
}
],
"@typescript-eslint/no-unused-vars": "error",
Expand Down
2 changes: 2 additions & 0 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

try {
timeout?.throwIfExpired();
timeout?.ref();
return await (timeout ? Promise.race([promise, timeout]) : promise);
} catch (error) {
if (TimeoutError.is(error)) {
Expand All @@ -399,6 +400,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
throw error;
} finally {
timeout?.unref();
abortListener?.[kDispose]();
timeout?.clear();
}
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export {
MongoClientBulkWriteCursorError,
MongoClientBulkWriteError,
MongoClientBulkWriteExecutionError,
MongoClientClosedError,
MongoCompatibilityError,
MongoCursorExhaustedError,
MongoCursorInUseError,
Expand Down
13 changes: 10 additions & 3 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { MONGO_CLIENT_EVENTS } from './constants';
import { type AbstractCursor } from './cursor/abstract_cursor';
import { Db, type DbOptions } from './db';
import type { Encrypter } from './encrypter';
import { MongoInvalidArgumentError } from './error';
import { MongoClientClosedError, MongoInvalidArgumentError } from './error';
import { MongoClientAuthProviders } from './mongo_client_auth_providers';
import {
type LogComponentSeveritiesClientOptions,
Expand Down Expand Up @@ -692,7 +692,6 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
/* @internal */
private async _close(force = false): Promise<void> {
try {
this.closeController.abort();
// There's no way to set hasBeenClosed back to false
Object.defineProperty(this.s, 'hasBeenClosed', {
value: true,
Expand All @@ -701,6 +700,12 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
writable: false
});

if (this.options.maxPoolSize === 1) {
// If maxPoolSize is 1 we won't be able to run anything
// unless we interrupt whatever is using the one connection.
this.closeController.abort(new MongoClientClosedError());
}

const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
this.s.activeCursors.clear();

Expand Down Expand Up @@ -749,7 +754,9 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
await encrypter.close(this, force);
}
} finally {
// ignore
if (!this.closeController.signal.aborted) {
this.closeController.abort(new MongoClientClosedError());
}
}
}

Expand Down
16 changes: 7 additions & 9 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
const makeMonitoringConnection = async () => {
const socket = await makeSocket(monitor.connectOptions, monitor.closeSignal);
const connection = makeConnection(monitor.connectOptions, socket);
connection.unref();
// The start time is after socket creation but before the handshake
start = now();
try {
Expand Down Expand Up @@ -447,15 +448,11 @@ function monitorServer(monitor: Monitor) {

// if the check indicates streaming is supported, immediately reschedule monitoring
if (useStreamingProtocol(monitor, hello?.topologyVersion)) {
clearOnAbortTimeout(
() => {
if (!isInCloseState(monitor)) {
monitor.monitorId?.wake();
}
},
0,
monitor.closeSignal
);
queueMicrotask(() => {
if (!isInCloseState(monitor)) {
monitor.monitorId?.wake();
}
});
}

done();
Expand Down Expand Up @@ -554,6 +551,7 @@ export class RTTPinger {
if (connection == null) {
connect(this.monitor.connectOptions, this.closeSignal).then(
connection => {
connection.unref();
this.measureAndReschedule(start, connection);
},
() => {
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {

try {
timeout?.throwIfExpired();
timeout?.ref();
const server = await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
if (options.timeoutContext?.csotEnabled() && server.description.minRoundTripTime !== 0) {
options.timeoutContext.minRoundTripTime = server.description.minRoundTripTime;
Expand Down Expand Up @@ -661,6 +662,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
// Other server selection error
throw error;
} finally {
timeout?.unref();
abortListener?.[kDispose]();
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
}
Expand Down
6 changes: 4 additions & 2 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export function clearOnAbortTimeout(
}, ms);

if ('unref' in id && typeof id.unref === 'function') {
id.unref();
// id.unref();
}

const abortListener = addAbortListener(closeSignal, function clearId() {
Expand Down Expand Up @@ -116,7 +116,9 @@ export class Timeout extends Promise<never> {
this.start = Math.trunc(performance.now());

if (rejection == null && this.duration > 0) {
if (options.closeSignal == null) throw new Error('incorrect timer use detected!');
if (options.closeSignal == null) {
throw new Error('You must provide a close signal to timeoutContext');
}

this.id = clearOnAbortTimeout(
() => {
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,7 @@ export function addAbortSignalToStream(

const abortListener = addAbortListener(signal, function () {
stream.off('close', abortListener[kDispose]).off('error', abortListener[kDispose]);
stream.destroy(this.reason);
if (!stream.destroyed) stream.destroy(this.reason);
});
// not nearly as complex as node's eos() but... do we need all that?? sobbing emoji.
stream.once('close', abortListener[kDispose]).once('error', abortListener[kDispose]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ describe('CSOT spec unit tests', function () {

describe('Client side encryption', function () {
describe('KMS requests', function () {
const stateMachine = new StateMachine({} as any);
const closeSignal = new AbortController().signal;
const stateMachine = new StateMachine({} as any, undefined, closeSignal);
const request = {
addResponse: _response => {},
status: {
Expand All @@ -137,7 +138,7 @@ describe('CSOT spec unit tests', function () {
const timeoutContext = new CSOTTimeoutContext({
timeoutMS: 500,
serverSelectionTimeoutMS: 30000,
closeSignal: new AbortController().signal
closeSignal
});
const err = await stateMachine.kmsRequest(request, { timeoutContext }).catch(e => e);
expect(err).to.be.instanceOf(MongoOperationTimeoutError);
Expand Down
13 changes: 10 additions & 3 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const sinon = require('sinon');
const { Writable } = require('stream');
const { once, on } = require('events');
const { setTimeout } = require('timers');
const { ReadPreference } = require('../../mongodb');
const { ReadPreference, MongoClientClosedError } = require('../../mongodb');
const { ServerType } = require('../../mongodb');
const { formatSort } = require('../../mongodb');

Expand Down Expand Up @@ -1861,18 +1861,25 @@ describe('Cursor', function () {
// insert only 2 docs in capped coll of 3
await collection.insertMany([{ a: 1 }, { a: 1 }]);

const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 });
const maxAwaitTimeMS = 5000;

const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS });

await cursor.next();
await cursor.next();
// will block for maxAwaitTimeMS (except we are closing the client)
const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error);

const start = performance.now();
await client.close();
const end = performance.now();

expect(cursor).to.have.property('closed', true);

expect(end - start, "close returns before cursor's await time").to.be.lessThan(maxAwaitTimeMS);

const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.instanceOf(Error); // TODO: Whatever the MongoClient aborts with.
expect(error).to.be.instanceOf(MongoClientClosedError);
});

it('shouldAwaitData', {
Expand Down
6 changes: 5 additions & 1 deletion test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,11 @@ describe('class AbstractCursor', function () {
client.on('commandStarted', filterForCommands('killCursors', commands));

collection = client.db('abstract_cursor_integration').collection('test');
internalContext = TimeoutContext.create({ timeoutMS: 1000, serverSelectionTimeoutMS: 2000 });
internalContext = TimeoutContext.create({
timeoutMS: 1000,
serverSelectionTimeoutMS: 2000,
closeSignal: new AbortController().signal
});

context = new CursorTimeoutContext(internalContext, Symbol());

Expand Down
19 changes: 9 additions & 10 deletions test/integration/uri-options/uri.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,20 @@ describe('URI', function () {
);
});

it('should correctly translate uri options', {
metadata: { requires: { topology: 'replicaset' } },
test: function (done) {
it(
'should correctly translate uri options',
{ requires: { topology: 'replicaset' } },
async function () {
const config = this.configuration;
const uri = `mongodb://${config.host}:${config.port}/${config.db}?replicaSet=${config.replicasetName}`;

const client = this.configuration.newClient(uri);
client.connect((err, client) => {
expect(err).to.not.exist;
expect(client).to.exist;
expect(client.options.replicaSet).to.exist.and.equal(config.replicasetName);
client.close(done);
});
await client.connect();
expect(client).to.exist;
expect(client.options.replicaSet).to.exist.and.equal(config.replicasetName);
await client.close();
}
});
);

it('should generate valid credentials with X509', {
metadata: { requires: { topology: 'single' } },
Expand Down

0 comments on commit 7f017b0

Please sign in to comment.