diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts index 587c0fe185..3fb4d2c75a 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import { AsyncResource } from 'async_hooks'; import { context, @@ -42,6 +43,7 @@ import { MongoInternalTopology, WireProtocolInternal, V4Connection, + V4ConnectionPool, } from './internal-types'; import { V4Connect, V4Session } from './internal-types'; import { VERSION } from './version'; @@ -76,6 +78,8 @@ export class MongoDBInstrumentation extends InstrumentationBase { const { v4PatchConnect, v4UnpatchConnect } = this._getV4ConnectPatches(); const { v4PatchConnection, v4UnpatchConnection } = this._getV4ConnectionPatches(); + const { v4PatchConnectionPool, v4UnpatchConnectionPool } = + this._getV4ConnectionPoolPatches(); const { v4PatchSessions, v4UnpatchSessions } = this._getV4SessionsPatches(); return [ @@ -105,6 +109,12 @@ export class MongoDBInstrumentation extends InstrumentationBase { v4PatchConnection, v4UnpatchConnection ), + new InstrumentationNodeModuleFile( + 'mongodb/lib/cmap/connection_pool.js', + ['4.*', '5.*'], + v4PatchConnectionPool, + v4UnpatchConnectionPool + ), new InstrumentationNodeModuleFile( 'mongodb/lib/cmap/connect.js', ['4.*', '5.*'], @@ -268,6 +278,35 @@ export class MongoDBInstrumentation extends InstrumentationBase { }; } + private _getV4ConnectionPoolPatches() { + return { + v4PatchConnectionPool: (moduleExports: any, moduleVersion?: string) => { + diag.debug(`Applying patch for mongodb@${moduleVersion}`); + const poolPrototype = moduleExports.ConnectionPool.prototype; + + if (isWrapped(poolPrototype.checkOut)) { + this._unwrap(poolPrototype, 'checkOut'); + } + + this._wrap( + poolPrototype, + 'checkOut', + this._getV4ConnectionPoolCheckOut() + ); + return moduleExports; + }, + v4UnpatchConnectionPool: ( + moduleExports?: any, + moduleVersion?: string + ) => { + diag.debug(`Removing internal patch for mongodb@${moduleVersion}`); + if (moduleExports === undefined) return; + + this._unwrap(moduleExports.ConnectionPool.prototype, 'checkOut'); + }, + }; + } + private _getV4ConnectPatches() { return { v4PatchConnect: (moduleExports: any, moduleVersion?: string) => { @@ -288,6 +327,17 @@ export class MongoDBInstrumentation extends InstrumentationBase { }; } + // This patch will become unnecessary once + // https://jira.mongodb.org/browse/NODE-5639 is done. + private _getV4ConnectionPoolCheckOut() { + return (original: V4ConnectionPool['checkOut']) => { + return function patchedCheckout(this: unknown, callback: any) { + const patchedCallback = AsyncResource.bind(callback); + return original.call(this, patchedCallback); + }; + }; + } + private _getV4ConnectCommand() { const instrumentation = this; diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts index 03131aa12a..5cb4119de5 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts @@ -184,6 +184,13 @@ export type V4Connection = { ): void; }; +// https://github.com/mongodb/node-mongodb-native/blob/v4.2.2/src/cmap/connection_pool.ts +export type V4ConnectionPool = { + // Instrumentation just cares about carrying the async context so + // types of callback params are not needed + checkOut: (callback: (error: any, connection: any) => void) => void; +}; + // https://github.com/mongodb/node-mongodb-native/blob/v4.2.2/src/cmap/connect.ts export type V4Connect = { connect: (options: any, callback: any) => void; diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts index 18ba6bc9ef..724e33cffd 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts @@ -237,6 +237,44 @@ describe('MongoDBInstrumentation-Tracing-v4', () => { }); }); }); + + it('should create child spans for concurrent cursor operations', done => { + const queries = [{ a: 1 }, { a: 2 }, { a: 3 }]; + const tasks = queries.map((query, idx) => { + return new Promise((resolve, reject) => { + process.nextTick(() => { + const span = trace + .getTracer('default') + .startSpan(`findRootSpan ${idx}`); + context.with(trace.setSpan(context.active(), span), () => { + collection + .find(query) + .toArray() + .then(() => { + resolve(span.end()); + }) + .catch(reject); + }); + }); + }); + }); + + Promise.all(tasks) + .then(() => { + const spans = getTestSpans(); + const roots = spans.filter(s => s.name.startsWith('findRootSpan')); + + roots.forEach(root => { + const rootId = root.spanContext().spanId; + const children = spans.filter(s => s.parentSpanId === rootId); + assert.strictEqual(children.length, 1); + }); + done(); + }) + .catch(err => { + done(err); + }); + }); }); /** Should intercept command */ diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts index bd8271cb8d..e9f80ad6c5 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts @@ -243,6 +243,44 @@ describe('MongoDBInstrumentation-Tracing-v5', () => { }); }); }); + + it('should create child spans for concurrent cursor operations', done => { + const queries = [{ a: 1 }, { a: 2 }, { a: 3 }]; + const tasks = queries.map((query, idx) => { + return new Promise((resolve, reject) => { + process.nextTick(() => { + const span = trace + .getTracer('default') + .startSpan(`findRootSpan ${idx}`); + context.with(trace.setSpan(context.active(), span), () => { + collection + .find(query) + .toArray() + .then(() => { + resolve(span.end()); + }) + .catch(reject); + }); + }); + }); + }); + + Promise.all(tasks) + .then(() => { + const spans = getTestSpans(); + const roots = spans.filter(s => s.name.startsWith('findRootSpan')); + + roots.forEach(root => { + const rootId = root.spanContext().spanId; + const children = spans.filter(s => s.parentSpanId === rootId); + assert.strictEqual(children.length, 1); + }); + done(); + }) + .catch(err => { + done(err); + }); + }); }); /** Should intercept command */