Skip to content

Commit

Permalink
test(NODE-6692): check that event emitters have error listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jan 28, 2025
1 parent 654069f commit 3a7f111
Show file tree
Hide file tree
Showing 26 changed files with 125 additions and 78 deletions.
2 changes: 1 addition & 1 deletion .mocharc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"source-map-support/register",
"ts-node/register",
"test/tools/runner/chai_addons.ts",
"test/tools/runner/hooks/unhandled_checker.ts"
"test/tools/runner/ee_checker.ts"
],
"extension": [
"js",
Expand Down
2 changes: 1 addition & 1 deletion src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import type { ReadPreference } from './read_preference';
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
import type { ServerSessionId } from './sessions';
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
import { filterOptions, getTopology, type MongoDBNamespace, noop, squashError } from './utils';

const CHANGE_STREAM_OPTIONS = [
'resumeAfter',
Expand Down
2 changes: 2 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
HostAddress,
maxWireVersion,
type MongoDBNamespace,
noop,
now,
once,
squashError,
Expand Down Expand Up @@ -229,6 +230,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

constructor(stream: Stream, options: ConnectionOptions) {
super();
this.on('error', noop);

this.socket = stream;
this.id = options.id;
Expand Down
2 changes: 2 additions & 0 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
kDispose,
List,
makeCounter,
noop,
now,
promiseWithResolvers
} from '../utils';
Expand Down Expand Up @@ -200,6 +201,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

constructor(server: Server, options: ConnectionPoolOptions) {
super();
this.on('error', noop);

this.options = Object.freeze({
connectionType: Connection,
Expand Down
2 changes: 2 additions & 0 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
type Disposable,
kDispose,
type MongoDBNamespace,
noop,
squashError
} from '../utils';

Expand Down Expand Up @@ -267,6 +268,7 @@ export abstract class AbstractCursor<
options: AbstractCursorOptions & Abortable = {}
) {
super();
this.on('error', noop);

if (!client.s.isMongoClient) {
throw new MongoRuntimeError('Cursor must be constructed with MongoClient');
Expand Down
3 changes: 2 additions & 1 deletion src/gridfs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { type Filter, TypedEventEmitter } from '../mongo_types';
import type { ReadPreference } from '../read_preference';
import type { Sort } from '../sort';
import { CSOTTimeoutContext } from '../timeout';
import { resolveOptions } from '../utils';
import { noop, resolveOptions } from '../utils';
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
import type { FindOptions } from './../operations/find';
import {
Expand Down Expand Up @@ -87,6 +87,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {

constructor(db: Db, options?: GridFSBucketOptions) {
super();
this.on('error', noop);
this.setMaxListeners(0);
const privateOptions = resolveOptions(db, {
...DEFAULT_GRIDFS_BUCKET_OPTIONS,
Expand Down
2 changes: 2 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {
hostMatchesWildcards,
isHostMatch,
type MongoDBNamespace,
noop,
ns,
resolveOptions,
squashError
Expand Down Expand Up @@ -384,6 +385,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements

constructor(url: string, options?: MongoClientOptions) {
super();
this.on('error', noop);

this.options = parseOptions(url, this, options);

Expand Down
8 changes: 7 additions & 1 deletion src/mongo_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
type MongoLogger
} from './mongo_logger';
import type { Sort } from './sort';
import { noop } from './utils';

/** @internal */
export type TODO_NODE_3286 = any;
Expand Down Expand Up @@ -472,7 +473,12 @@ export class TypedEventEmitter<Events extends EventsDescription> extends EventEm
}

/** @public */
export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {}
export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {
constructor(...args: any[]) {
super(...args);
this.on('error', noop);
}
}

/** @public */
export type Abortable = {
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
type Callback,
type EventEmitterWithState,
makeStateMachine,
noop,
now,
ns
} from '../utils';
Expand Down Expand Up @@ -102,6 +103,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {

constructor(server: Server, options: MonitorOptions) {
super();
this.on('error', noop);

this.server = server;
this.connection = null;
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {
makeStateMachine,
maxWireVersion,
type MongoDBNamespace,
noop,
supportsRetryableWrites
} from '../utils';
import { throwIfWriteConcernError } from '../write_concern';
Expand Down Expand Up @@ -142,6 +143,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
*/
constructor(topology: Topology, description: ServerDescription, options: ServerOptions) {
super();
this.on('error', noop);

this.serverApi = options.serverApi;

Expand Down
3 changes: 2 additions & 1 deletion src/sdam/srv_polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';

import { MongoRuntimeError } from '../error';
import { TypedEventEmitter } from '../mongo_types';
import { checkParentDomainMatch, HostAddress, squashError } from '../utils';
import { checkParentDomainMatch, HostAddress, noop, squashError } from '../utils';

/**
* @internal
Expand Down Expand Up @@ -49,6 +49,7 @@ export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {

constructor(options: SrvPollerOptions) {
super();
this.on('error', noop);

if (!options || !options.srvHost) {
throw new MongoRuntimeError('Options for SrvPoller must exist and include srvHost');
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
kDispose,
List,
makeStateMachine,
noop,
now,
ns,
promiseWithResolvers,
Expand Down Expand Up @@ -248,6 +249,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
options: TopologyOptions
) {
super();
this.on('error', noop);

this.client = client;
// Options should only be undefined in tests, MongoClient will always have defined options
Expand Down
2 changes: 2 additions & 0 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
isPromiseLike,
List,
maxWireVersion,
noop,
now,
squashError,
uuidV4
Expand Down Expand Up @@ -161,6 +162,7 @@ export class ClientSession
clientOptions: MongoOptions
) {
super();
this.on('error', noop);

if (client == null) {
// TODO(NODE-3483)
Expand Down
3 changes: 2 additions & 1 deletion test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ describe('Change Streams', function () {
// ChangeStream detects emitter usage via 'newListener' event
// so this covers all emitter methods
});
changeStream.on('error', () => null); // one must listen for errors if they use EE mode.

await once(changeStream.cursor, 'init');
expect(changeStream).to.have.property('mode', 'emitter');
Expand Down Expand Up @@ -971,7 +972,7 @@ describe('Change Streams', function () {
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([]);
changeStream.on('change', sinon.stub());
changeStream.on('change', sinon.stub()).on('error', () => null);

try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand Down
2 changes: 2 additions & 0 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ describe('Change Stream prose tests', function () {
expect(err).to.not.exist;
coll = client.db('integration_tests').collection('setupAfterTest');
const changeStream = coll.watch();
changeStream.on('error', done);
waitForStarted(changeStream, () => {
coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } }, err => {
expect(err).to.not.exist;
Expand Down Expand Up @@ -932,6 +933,7 @@ describe('Change Stream prose tests', function () {
let events = [];
client.on('commandStarted', e => recordEvent(events, e));
const changeStream = coll.watch([], { startAfter });
changeStream.on('error', done);
this.defer(() => changeStream.close());

changeStream.on('change', change => {
Expand Down
12 changes: 7 additions & 5 deletions test/integration/crud/crud_api.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { on } from 'events';
import { on, once } from 'events';
import * as semver from 'semver';
import * as sinon from 'sinon';

Expand Down Expand Up @@ -307,13 +307,15 @@ describe('CRUD API', function () {

describe('#stream()', () => {
it('creates a node stream that emits data events', async () => {
const count = 0;
let count = 0;
const cursor = makeCursor();
const stream = cursor.stream();
on(stream, 'data');
cursor.once('close', function () {
expect(count).to.equal(2);
const willClose = once(cursor, 'close');
stream.on('data', () => {
count++;
});
await willClose;
expect(count).to.equal(2);
});
});

Expand Down
6 changes: 3 additions & 3 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1993,15 +1993,15 @@ describe('Cursor', function () {
expect(res).property('insertedId').to.exist;
}, 300);

const start = new Date();
const start = performance.now();
const doc1 = await cursor.next();
expect(doc1).to.have.property('b', 2);
const end = new Date();
const end = performance.now();

await later; // make sure this finished, without a failure

// We should see here that cursor.next blocked for at least 300ms
expect(end.getTime() - start.getTime()).to.be.at.least(300);
expect(end - start).to.be.at.least(290);
}
}
);
Expand Down
54 changes: 37 additions & 17 deletions test/integration/node-specific/examples/change_streams.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ maybeDescribe('examples(change-stream):', function () {
// Start Changestream Example 1
const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});
changeStream
.on('change', next => {
// process next document
})
.once('error', () => {
// handle error
});
// End Changestream Example 1

const changeStreamIterator = collection.watch();
Expand Down Expand Up @@ -113,9 +117,13 @@ maybeDescribe('examples(change-stream):', function () {
// Start Changestream Example 2
const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', next => {
// process next document
});
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});
// End Changestream Example 2

// Start Changestream Example 2 Alternative
Expand Down Expand Up @@ -151,15 +159,23 @@ maybeDescribe('examples(change-stream):', function () {
const changeStream = collection.watch();

let newChangeStream;
changeStream.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();

newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream.on('change', next => {
processChange(next);
changeStream
.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();

newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream
.on('change', next => {
processChange(next);
})
.once('error', error => {
// handle error
});
})
.once('error', error => {
// handle error
});
});
// End Changestream Example 3

// Start Changestream Example 3 Alternative
Expand Down Expand Up @@ -200,9 +216,13 @@ maybeDescribe('examples(change-stream):', function () {

const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});
// End Changestream Example 4

// Start Changestream Example 4 Alternative
Expand Down
3 changes: 2 additions & 1 deletion test/integration/sessions/sessions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,14 @@ describe('Sessions Spec', function () {
});
});

context('when using a LegacyMongoClient', () => {
context.skip('when using a LegacyMongoClient', () => {
let legacyClient;
beforeEach(async function () {
const options = this.configuration.serverApi
? { serverApi: this.configuration.serverApi }
: {};
legacyClient = new LegacyMongoClient(this.configuration.url(), options);
legacyClient.on('error', () => null); // Uses released version of the driver so it won't be fixed until the error listeners are published
});

afterEach(async function () {
Expand Down
2 changes: 1 addition & 1 deletion test/mocha_mongodb.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
"require": [
"source-map-support/register",
"ts-node/register",
"test/tools/runner/ee_checker.ts",
"test/tools/runner/chai_addons.ts",
"test/tools/runner/hooks/configuration.ts",
"test/tools/runner/hooks/unhandled_checker.ts",
"test/tools/runner/hooks/leak_checker.ts",
"test/tools/runner/hooks/legacy_crud_shims.ts"
],
Expand Down
1 change: 1 addition & 0 deletions test/tools/cmap_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ export class ThreadContext {
poolOptions: Partial<ConnectionPoolOptions> = {},
contextOptions: { injectPoolStats: boolean }
) {
this.poolEventsEventEmitter.on('error', () => null);
this.#poolOptions = poolOptions;
this.#hostAddress = hostAddress;
this.#server = server;
Expand Down
Loading

0 comments on commit 3a7f111

Please sign in to comment.