Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6762): destroy async resources when client closes #4388

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@
{
"selector": "BinaryExpression[operator=/[=!]==?/] Literal[value='undefined']",
"message": "Do not strictly check typeof undefined (NOTE: currently this rule only detects the usage of 'undefined' string literal so this could be a misfire)"
},
{
"selector": "CallExpression[callee.property.name='removeAllListeners'][arguments.length=0]",
"message": "removeAllListeners can remove error listeners leading to uncaught errors"
},
{
"selector": "CallExpression[callee.name='setTimeout']",
"message": "setTimeout must be abortable"
},
{
"selector": "CallExpression[callee.name='clearTimeout']",
"message": "clearTimeout must remove abort listener"
}
],
"@typescript-eslint/no-unused-vars": "error",
Expand Down
1 change: 1 addition & 0 deletions .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ export MONGODB_URI=${MONGODB_URI}
export LOAD_BALANCER=${LOAD_BALANCER}
export TEST_CSFLE=${TEST_CSFLE}
export COMPRESSOR=${COMPRESSOR}
export NODE_OPTIONS="${NODE_OPTIONS} --trace-uncaught"
npm run "${TEST_NPM_SCRIPT}"
2 changes: 1 addition & 1 deletion .mocharc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"source-map-support/register",
"ts-node/register",
"test/tools/runner/chai_addons.ts",
"test/tools/runner/hooks/unhandled_checker.ts"
"test/tools/runner/ee_checker.ts"
],
"extension": [
"js",
Expand Down
15 changes: 8 additions & 7 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ export class ChangeStream<
this.isClosed = false;
this.mode = false;

this.on('error', () => null);

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
Expand All @@ -680,7 +682,8 @@ export class ChangeStream<
if (this.options.timeoutMS != null) {
this.timeoutContext = new CSOTTimeoutContext({
timeoutMS: this.options.timeoutMS,
serverSelectionTimeoutMS
serverSelectionTimeoutMS,
closeSignal: this.cursor.client.closeSignal
});
}
}
Expand Down Expand Up @@ -951,12 +954,10 @@ export class ChangeStream<

/** @internal */
private _endStream(): void {
const cursorStream = this.cursorStream;
if (cursorStream) {
['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
cursorStream.destroy();
}

this.cursorStream?.removeAllListeners('data');
this.cursorStream?.removeAllListeners('close');
this.cursorStream?.removeAllListeners('end');
this.cursorStream?.destroy();
this.cursorStream = undefined;
}

Expand Down
36 changes: 22 additions & 14 deletions src/client-side-encryption/auto_encrypter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,17 @@ export class AutoEncrypter {
context.ns = ns;
context.document = cmd;

const stateMachine = new StateMachine({
promoteValues: false,
promoteLongs: false,
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
});
const stateMachine = new StateMachine(
{
promoteValues: false,
promoteLongs: false,
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
},
undefined,
this._client.closeSignal
);

return deserialize(await stateMachine.execute(this, context, options), {
promoteValues: false,
Expand All @@ -420,12 +424,16 @@ export class AutoEncrypter {

context.id = this._contextCounter++;

const stateMachine = new StateMachine({
...options,
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
});
const stateMachine = new StateMachine(
{
...options,
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
},
undefined,
this._client.closeSignal
);

return await stateMachine.execute(this, context, options);
}
Expand All @@ -438,7 +446,7 @@ export class AutoEncrypter {
* the original ones.
*/
async askForKMSCredentials(): Promise<KMSProviders> {
return await refreshKMSCredentials(this._kmsProviders);
return await refreshKMSCredentials(this._kmsProviders, this._client.closeSignal);
}

/**
Expand Down
58 changes: 37 additions & 21 deletions src/client-side-encryption/client_encryption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,15 @@ export class ClientEncryption {
keyMaterial
});

const stateMachine = new StateMachine({
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
});
const stateMachine = new StateMachine(
{
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
},
undefined,
this._client.closeSignal
);

const timeoutContext =
options?.timeoutContext ??
Expand Down Expand Up @@ -283,11 +287,15 @@ export class ClientEncryption {
}
const filterBson = serialize(filter);
const context = this._mongoCrypt.makeRewrapManyDataKeyContext(filterBson, keyEncryptionKeyBson);
const stateMachine = new StateMachine({
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
});
const stateMachine = new StateMachine(
{
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
},
undefined,
this._client.closeSignal
);

const timeoutContext = TimeoutContext.create(
resolveTimeoutOptions(this._client, { timeoutMS: this._timeoutMS })
Expand Down Expand Up @@ -687,11 +695,15 @@ export class ClientEncryption {
const valueBuffer = serialize({ v: value });
const context = this._mongoCrypt.makeExplicitDecryptionContext(valueBuffer);

const stateMachine = new StateMachine({
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
});
const stateMachine = new StateMachine(
{
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
},
undefined,
this._client.closeSignal
);

const timeoutContext =
this._timeoutMS != null
Expand All @@ -712,7 +724,7 @@ export class ClientEncryption {
* the original ones.
*/
async askForKMSCredentials(): Promise<KMSProviders> {
return await refreshKMSCredentials(this._kmsProviders);
return await refreshKMSCredentials(this._kmsProviders, this._client.closeSignal);
}

static get libmongocryptVersion() {
Expand Down Expand Up @@ -771,11 +783,15 @@ export class ClientEncryption {
}

const valueBuffer = serialize({ v: value });
const stateMachine = new StateMachine({
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
});
const stateMachine = new StateMachine(
{
proxyOptions: this._proxyOptions,
tlsOptions: this._tlsOptions,
socketOptions: autoSelectSocketOptions(this._client.s.options)
},
undefined,
this._client.closeSignal
);
const context = this._mongoCrypt.makeExplicitEncryptionContext(valueBuffer, contextOptions);

const timeoutContext =
Expand Down
20 changes: 12 additions & 8 deletions src/client-side-encryption/providers/azure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ interface AzureTokenCacheEntry extends AccessToken {
export class AzureCredentialCache {
cachedToken: AzureTokenCacheEntry | null = null;

async getToken(): Promise<AccessToken> {
async getToken(closeSignal: AbortSignal): Promise<AccessToken> {
if (this.cachedToken == null || this.needsRefresh(this.cachedToken)) {
this.cachedToken = await this._getToken();
this.cachedToken = await this._getToken(closeSignal);
}

return { accessToken: this.cachedToken.accessToken };
Expand All @@ -53,8 +53,8 @@ export class AzureCredentialCache {
/**
* exposed for testing
*/
_getToken(): Promise<AzureTokenCacheEntry> {
return fetchAzureKMSToken();
_getToken(closeSignal: AbortSignal): Promise<AzureTokenCacheEntry> {
return fetchAzureKMSToken(undefined, closeSignal);
}
}

Expand Down Expand Up @@ -156,11 +156,12 @@ export function prepareRequest(options: AzureKMSRequestOptions): {
* [prose test 18](https://github.com/mongodb/specifications/tree/master/source/client-side-encryption/tests#azure-imds-credentials)
*/
export async function fetchAzureKMSToken(
options: AzureKMSRequestOptions = {}
options: AzureKMSRequestOptions = {},
closeSignal: AbortSignal
): Promise<AzureTokenCacheEntry> {
const { headers, url } = prepareRequest(options);
try {
const response = await get(url, { headers });
const response = await get(url, { headers }, closeSignal);
return await parseResponse(response);
} catch (error) {
if (error instanceof MongoNetworkTimeoutError) {
Expand All @@ -175,7 +176,10 @@ export async function fetchAzureKMSToken(
*
* @throws Will reject with a `MongoCryptError` if the http request fails or the http response is malformed.
*/
export async function loadAzureCredentials(kmsProviders: KMSProviders): Promise<KMSProviders> {
const azure = await tokenCache.getToken();
export async function loadAzureCredentials(
kmsProviders: KMSProviders,
closeSignal: AbortSignal
): Promise<KMSProviders> {
const azure = await tokenCache.getToken(closeSignal);
return { ...kmsProviders, azure };
}
7 changes: 5 additions & 2 deletions src/client-side-encryption/providers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ export function isEmptyCredentials(
*
* @internal
*/
export async function refreshKMSCredentials(kmsProviders: KMSProviders): Promise<KMSProviders> {
export async function refreshKMSCredentials(
kmsProviders: KMSProviders,
closeSignal: AbortSignal
): Promise<KMSProviders> {
let finalKMSProviders = kmsProviders;

if (isEmptyCredentials('aws', kmsProviders)) {
Expand All @@ -188,7 +191,7 @@ export async function refreshKMSCredentials(kmsProviders: KMSProviders): Promise
}

if (isEmptyCredentials('azure', kmsProviders)) {
finalKMSProviders = await loadAzureCredentials(finalKMSProviders);
finalKMSProviders = await loadAzureCredentials(finalKMSProviders, closeSignal);
}
return finalKMSProviders;
}
1 change: 0 additions & 1 deletion src/client-side-encryption/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ export class StateMachine {
function destroySockets() {
for (const sock of [socket, netSocket]) {
if (sock) {
sock.removeAllListeners();
sock.destroy();
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/cmap/auth/auth_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ export abstract class AuthProvider {
*/
async prepare(
handshakeDoc: HandshakeDocument,
_authContext: AuthContext
_authContext: AuthContext,
_closeSignal: AbortSignal
): Promise<HandshakeDocument> {
return handshakeDoc;
}
Expand All @@ -57,19 +58,19 @@ export abstract class AuthProvider {
*
* @param context - A shared context for authentication flow
*/
abstract auth(context: AuthContext): Promise<void>;
abstract auth(context: AuthContext, closeSignal: AbortSignal): Promise<void>;

/**
* Reauthenticate.
* @param context - The shared auth context.
*/
async reauth(context: AuthContext): Promise<void> {
async reauth(context: AuthContext, closeSignal: AbortSignal): Promise<void> {
if (context.reauthenticating) {
throw new MongoRuntimeError('Reauthentication already in progress.');
}
try {
context.reauthenticating = true;
await this.auth(context);
await this.auth(context, closeSignal);
} finally {
context.reauthenticating = false;
}
Expand Down
21 changes: 15 additions & 6 deletions src/cmap/auth/mongodb_oidc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,20 @@ export interface Workflow {
/**
* Each workflow should specify the correct custom behaviour for reauthentication.
*/
reauthenticate(connection: Connection, credentials: MongoCredentials): Promise<void>;
reauthenticate(
connection: Connection,
credentials: MongoCredentials,
closeSignal: AbortSignal
): Promise<void>;

/**
* Get the document to add for speculative authentication.
*/
speculativeAuth(connection: Connection, credentials: MongoCredentials): Promise<Document>;
speculativeAuth(
connection: Connection,
credentials: MongoCredentials,
closeSignal: AbortSignal
): Promise<Document>;
}

/** @internal */
Expand Down Expand Up @@ -141,14 +149,14 @@ export class MongoDBOIDC extends AuthProvider {
/**
* Authenticate using OIDC
*/
override async auth(authContext: AuthContext): Promise<void> {
override async auth(authContext: AuthContext, closeSignal: AbortSignal): Promise<void> {
const { connection, reauthenticating, response } = authContext;
if (response?.speculativeAuthenticate?.done && !reauthenticating) {
return;
}
const credentials = getCredentials(authContext);
if (reauthenticating) {
await this.workflow.reauthenticate(connection, credentials);
await this.workflow.reauthenticate(connection, credentials, closeSignal);
} else {
await this.workflow.execute(connection, credentials, response);
}
Expand All @@ -159,11 +167,12 @@ export class MongoDBOIDC extends AuthProvider {
*/
override async prepare(
handshakeDoc: HandshakeDocument,
authContext: AuthContext
authContext: AuthContext,
closeSignal: AbortSignal
): Promise<HandshakeDocument> {
const { connection } = authContext;
const credentials = getCredentials(authContext);
const result = await this.workflow.speculativeAuth(connection, credentials);
const result = await this.workflow.speculativeAuth(connection, credentials, closeSignal);
return { ...handshakeDoc, ...result };
}
}
Expand Down
Loading