From 74f592b2fe4fbae91aee1b08d1b857634cf88674 Mon Sep 17 00:00:00 2001 From: Alberto Schiabel Date: Tue, 5 Sep 2023 11:17:09 +0200 Subject: [PATCH] fix(js-connectors): enable batched/implicit transaction event logs and metrics (#4197) * fix(js-connectors): [PlanetScale] enable event logs in transactions * chore(js-connectors): [Neon / Pg] add skeleton for event logs in transactions * chore(js-connectors): add "@prisma/client" smoke tests, fix LibraryEngine types * feat(js-connectors): add "@prisma/client" mini test suite * fix(js-connectors): [Neon / pg] transaction event logs * chore(js-connectors): simplify transaction options * chore(js-connectors): simplify startTransaction signature * chore(js-connectors): turn smoke-test-js into an ESModule * feat(js-connectors): address PR comments --- Cargo.lock | 1 + query-engine/js-connectors/Cargo.toml | 1 + query-engine/js-connectors/js/.nvmrc | 2 +- .../js/js-connector-utils/src/binder.ts | 3 +- .../js/js-connector-utils/src/types.ts | 15 ++- .../js/neon-js-connector/src/neon.ts | 47 ++++---- .../js/pg-js-connector/src/pg.ts | 49 ++++---- .../src/planetscale.ts | 45 ++++---- query-engine/js-connectors/js/pnpm-lock.yaml | 55 ++++++--- .../js-connectors/js/smoke-test-js/README.md | 14 ++- .../js/smoke-test-js/package.json | 20 ++-- .../js/smoke-test-js/src/client/client.ts | 106 ++++++++++++++++++ .../js/smoke-test-js/src/client/neon.test.ts | 13 +++ .../js/smoke-test-js/src/client/pg.test.ts | 13 +++ .../src/client/planetscale.test.ts | 13 +++ .../src/engines/types/Library.ts | 4 +- .../src/{test.ts => libquery/libquery.ts} | 11 +- .../smoke-test-js/src/{ => libquery}/neon.ts | 4 +- .../js/smoke-test-js/src/{ => libquery}/pg.ts | 4 +- .../src/{ => libquery}/planetscale.ts | 4 +- .../smoke-test-js/src/{ => libquery}/util.ts | 9 +- query-engine/js-connectors/src/proxy.rs | 36 ++++-- query-engine/js-connectors/src/queryable.rs | 32 +++++- query-engine/js-connectors/src/transaction.rs | 36 +++++- 24 files changed, 399 insertions(+), 138 deletions(-) create mode 100644 query-engine/js-connectors/js/smoke-test-js/src/client/client.ts create mode 100644 query-engine/js-connectors/js/smoke-test-js/src/client/neon.test.ts create mode 100644 query-engine/js-connectors/js/smoke-test-js/src/client/pg.test.ts create mode 100644 query-engine/js-connectors/js/smoke-test-js/src/client/planetscale.test.ts rename query-engine/js-connectors/js/smoke-test-js/src/{test.ts => libquery/libquery.ts} (97%) rename query-engine/js-connectors/js/smoke-test-js/src/{ => libquery}/neon.ts (73%) rename query-engine/js-connectors/js/smoke-test-js/src/{ => libquery}/pg.ts (71%) rename query-engine/js-connectors/js/smoke-test-js/src/{ => libquery}/planetscale.ts (75%) rename query-engine/js-connectors/js/smoke-test-js/src/{ => libquery}/util.ts (76%) diff --git a/Cargo.lock b/Cargo.lock index 027ea5ddc86..53b7e38623a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1878,6 +1878,7 @@ dependencies = [ "chrono", "expect-test", "futures", + "metrics 0.18.1", "napi", "napi-derive", "num-bigint", diff --git a/query-engine/js-connectors/Cargo.toml b/query-engine/js-connectors/Cargo.toml index 56726cf4e5b..75273e9fa2b 100644 --- a/query-engine/js-connectors/Cargo.toml +++ b/query-engine/js-connectors/Cargo.toml @@ -12,6 +12,7 @@ quaint.workspace = true psl.workspace = true tracing = "0.1" tracing-core = "0.1" +metrics = "0.18" # Note: these deps are temporarily specified here to avoid importing them from tiberius (the SQL server driver). # They will be imported from quaint-core instead in a future PR. diff --git a/query-engine/js-connectors/js/.nvmrc b/query-engine/js-connectors/js/.nvmrc index 5e0828ad15c..8c60e1e54f3 100644 --- a/query-engine/js-connectors/js/.nvmrc +++ b/query-engine/js-connectors/js/.nvmrc @@ -1 +1 @@ -v18.16.1 +v20.5.1 diff --git a/query-engine/js-connectors/js/js-connector-utils/src/binder.ts b/query-engine/js-connectors/js/js-connector-utils/src/binder.ts index 640fdf0462a..bf989ce9344 100644 --- a/query-engine/js-connectors/js/js-connector-utils/src/binder.ts +++ b/query-engine/js-connectors/js/js-connector-utils/src/binder.ts @@ -45,10 +45,11 @@ export const bindConnector = (connector: Connector): ErrorCapturingConnector => const bindTransaction = (errorRegistry: ErrorRegistryInternal, transaction: Transaction): Transaction => { return ({ flavour: transaction.flavour, + options: transaction.options, queryRaw: wrapAsync(errorRegistry, transaction.queryRaw.bind(transaction)), executeRaw: wrapAsync(errorRegistry, transaction.executeRaw.bind(transaction)), commit: wrapAsync(errorRegistry, transaction.commit.bind(transaction)), - rollback: wrapAsync(errorRegistry, transaction.rollback.bind(transaction)) + rollback: wrapAsync(errorRegistry, transaction.rollback.bind(transaction)), }); } diff --git a/query-engine/js-connectors/js/js-connector-utils/src/types.ts b/query-engine/js-connectors/js/js-connector-utils/src/types.ts index 0ecce7b4eda..2d9522cf21f 100644 --- a/query-engine/js-connectors/js/js-connector-utils/src/types.ts +++ b/query-engine/js-connectors/js/js-connector-utils/src/types.ts @@ -68,10 +68,9 @@ export interface Queryable { export interface Connector extends Queryable { /** - * Starts new transation with the specified isolation level - * @param isolationLevel + * Starts new transation. */ - startTransaction(isolationLevel?: string): Promise> + startTransaction(): Promise> /** * Closes the connection to the database, if any. @@ -79,9 +78,17 @@ export interface Connector extends Queryable { close: () => Promise> } +export type TransactionOptions = { + usePhantomQuery: boolean +} + export interface Transaction extends Queryable { /** - * Commit the transaction + * Transaction options. + */ + readonly options: TransactionOptions + /** + * Commit the transaction. */ commit(): Promise> /** diff --git a/query-engine/js-connectors/js/neon-js-connector/src/neon.ts b/query-engine/js-connectors/js/neon-js-connector/src/neon.ts index d5b11301c4c..a831e8fc9e8 100644 --- a/query-engine/js-connectors/js/neon-js-connector/src/neon.ts +++ b/query-engine/js-connectors/js/neon-js-connector/src/neon.ts @@ -2,7 +2,7 @@ import { FullQueryResults, PoolClient, neon, neonConfig } from '@neondatabase/se import { NeonConfig, NeonQueryFunction, Pool, QueryResult } from '@neondatabase/serverless' import ws from 'ws' import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils' -import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils' +import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector, TransactionOptions } from '@jkomyno/prisma-js-connector-utils' import { fieldToColumnType } from './conversion' neonConfig.webSocketConstructor = ws @@ -43,7 +43,9 @@ abstract class NeonQueryable implements Queryable { debug(`${tag} %O`, query) const { rowCount: rowsAffected } = await this.performIO(query) - return { ok: true, value: rowsAffected } + + // Note: `rowsAffected` can sometimes be null (e.g., when executing `"BEGIN"`) + return { ok: true, value: rowsAffected ?? 0 } } abstract performIO(query: Query): Promise @@ -71,24 +73,23 @@ class NeonWsQueryable extends NeonQueryable { } class NeonTransaction extends NeonWsQueryable implements Transaction { + constructor(client: PoolClient, readonly options: TransactionOptions) { + super(client) + } + async commit(): Promise> { - try { - await this.client.query('COMMIT'); - return { ok: true, value: undefined } - } finally { - this.client.release() - } + debug(`[js::commit]`) + + this.client.release() + return Promise.resolve({ ok: true, value: undefined }) } async rollback(): Promise> { - try { - await this.client.query('ROLLBACK'); - return { ok: true, value: undefined } - } finally { - this.client.release() - } - } + debug(`[js::rollback]`) + this.client.release() + return Promise.resolve({ ok: true, value: undefined }) + } } class NeonWsConnector extends NeonWsQueryable implements Connector { @@ -98,14 +99,16 @@ class NeonWsConnector extends NeonWsQueryable implements Connector { super(new Pool({ connectionString, ...rest })) } - async startTransaction(isolationLevel?: string | undefined): Promise> { - const connection = await this.client.connect() - await connection.query('BEGIN') - if (isolationLevel) { - await connection.query(`SET TRANSACTION ISOLATION LEVEL ${isolationLevel}`) + async startTransaction(): Promise> { + const options: TransactionOptions = { + usePhantomQuery: false, } - - return { ok: true, value: new NeonTransaction(connection) } + + const tag = '[js::startTransaction]' + debug(`${tag} options: %O`, options) + + const connection = await this.client.connect() + return { ok: true, value: new NeonTransaction(connection, options) } } async close() { diff --git a/query-engine/js-connectors/js/pg-js-connector/src/pg.ts b/query-engine/js-connectors/js/pg-js-connector/src/pg.ts index 2e98892f28e..e47aff8168e 100644 --- a/query-engine/js-connectors/js/pg-js-connector/src/pg.ts +++ b/query-engine/js-connectors/js/pg-js-connector/src/pg.ts @@ -1,6 +1,6 @@ import * as pg from 'pg' import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils' -import type { ErrorCapturingConnector, Connector, ConnectorConfig, Query, Queryable, Result, ResultSet, Transaction } from '@jkomyno/prisma-js-connector-utils' +import type { ErrorCapturingConnector, Connector, ConnectorConfig, Query, Queryable, Result, ResultSet, Transaction, TransactionOptions } from '@jkomyno/prisma-js-connector-utils' import { fieldToColumnType } from './conversion' const debug = Debug('prisma:js-connector:pg') @@ -45,8 +45,10 @@ class PgQueryable const tag = '[js::execute_raw]' debug(`${tag} %O`, query) - const { rowCount } = await this.performIO(query) - return { ok: true, value: rowCount } + const { rowCount: rowsAffected } = await this.performIO(query) + + // Note: `rowsAffected` can sometimes be null (e.g., when executing `"BEGIN"`) + return { ok: true, value: rowsAffected ?? 0 } } /** @@ -70,32 +72,22 @@ class PgQueryable class PgTransaction extends PgQueryable implements Transaction { - constructor(client: pg.PoolClient) { + constructor(client: pg.PoolClient, readonly options: TransactionOptions) { super(client) } async commit(): Promise> { - const tag = '[js::commit]' - debug(`${tag} committing transaction`) + debug(`[js::commit]`) - try { - await this.client.query('COMMIT') - return { ok: true, value: undefined } - } finally { - this.client.release() - } + this.client.release() + return Promise.resolve({ ok: true, value: undefined }) } async rollback(): Promise> { - const tag = '[js::rollback]' - debug(`${tag} rolling back the transaction`) + debug(`[js::rollback]`) - try { - await this.client.query('ROLLBACK') - return { ok: true, value: undefined } - } finally { - this.client.release() - } + this.client.release() + return Promise.resolve({ ok: true, value: undefined }) } } @@ -110,17 +102,16 @@ class PrismaPg extends PgQueryable implements Connector { super(client) } - async startTransaction(isolationLevel?: string): Promise> { - const connection = await this.client.connect() - await connection.query('BEGIN') - - if (isolationLevel) { - await connection.query( - `SET TRANSACTION ISOLATION LEVEL ${isolationLevel}`, - ) + async startTransaction(): Promise> { + const options: TransactionOptions = { + usePhantomQuery: false, } - return { ok: true, value: new PgTransaction(connection) } + const tag = '[js::startTransaction]' + debug(`${tag} options: %O`, options) + + const connection = await this.client.connect() + return { ok: true, value: new PgTransaction(connection, options) } } async close() { diff --git a/query-engine/js-connectors/js/planetscale-js-connector/src/planetscale.ts b/query-engine/js-connectors/js/planetscale-js-connector/src/planetscale.ts index b2b66c85b22..939b8b7c176 100644 --- a/query-engine/js-connectors/js/planetscale-js-connector/src/planetscale.ts +++ b/query-engine/js-connectors/js/planetscale-js-connector/src/planetscale.ts @@ -1,7 +1,7 @@ import * as planetScale from '@planetscale/database' import type { Config as PlanetScaleConfig } from '@planetscale/database' import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils' -import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils' +import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector, TransactionOptions } from '@jkomyno/prisma-js-connector-utils' import { type PlanetScaleColumnType, fieldToColumnType } from './conversion' import { createDeferred, Deferred } from './deferred' @@ -79,22 +79,27 @@ class PlanetScaleQueryable implements Transaction { - constructor(tx: planetScale.Transaction, private txDeferred: Deferred, private txResultPromise: Promise) { + constructor( + tx: planetScale.Transaction, + readonly options: TransactionOptions, + private txDeferred: Deferred, + private txResultPromise: Promise, + ) { super(tx) } async commit(): Promise> { - const tag = '[js::commit]' - debug(`${tag} committing transaction`) + debug(`[js::commit]`) + this.txDeferred.resolve() - return { ok: true, value: await this.txResultPromise }; + return Promise.resolve({ ok: true, value: await this.txResultPromise }) } async rollback(): Promise> { - const tag = '[js::rollback]' - debug(`${tag} rolling back the transaction`) + debug(`[js::rollback]`) + this.txDeferred.reject(new RollbackError()) - return { ok: true, value: await this.txResultPromise }; + return Promise.resolve({ ok: true, value: await this.txResultPromise }) } } @@ -104,30 +109,32 @@ class PrismaPlanetScale extends PlanetScaleQueryable imp const client = planetScale.connect(config) super(client) - } - async startTransaction(isolationLevel?: string) { - return new Promise>((resolve) => { + async startTransaction() { + const options: TransactionOptions = { + usePhantomQuery: true, + } + + const tag = '[js::startTransaction]' + debug(`${tag} options: %O`, options) + + return new Promise>((resolve, reject) => { const txResultPromise = this.client.transaction(async tx => { - if (isolationLevel) { - await tx.execute(`SET TRANSACTION ISOLATION LEVEL ${isolationLevel}`) - } const [txDeferred, deferredPromise] = createDeferred() - const txWrapper = new PlanetScaleTransaction(tx, txDeferred, txResultPromise) - - resolve({ ok: true, value: txWrapper }); + const txWrapper = new PlanetScaleTransaction(tx, options, txDeferred, txResultPromise) + resolve({ ok: true, value: txWrapper }) return deferredPromise }).catch(error => { // Rollback error is ignored (so that tx.rollback() won't crash) // any other error is legit and is re-thrown if (!(error instanceof RollbackError)) { - return Promise.reject(error) + return reject(error) } return undefined - }); + }) }) } diff --git a/query-engine/js-connectors/js/pnpm-lock.yaml b/query-engine/js-connectors/js/pnpm-lock.yaml index 1d45b3d6d8d..33044059eb8 100644 --- a/query-engine/js-connectors/js/pnpm-lock.yaml +++ b/query-engine/js-connectors/js/pnpm-lock.yaml @@ -77,15 +77,21 @@ importers: specifier: workspace:* version: link:../planetscale-js-connector '@prisma/client': - specifier: 5.2.0 - version: 5.2.0(prisma@5.2.0) + specifier: 5.3.0-integration-feat-js-connectors-in-client.13 + version: 5.3.0-integration-feat-js-connectors-in-client.13(prisma@5.3.0-integration-feat-js-connectors-in-client.13) + superjson: + specifier: ^1.13.1 + version: 1.13.1 devDependencies: + '@types/node': + specifier: ^20.5.1 + version: 20.5.1 cross-env: specifier: ^7.0.3 version: 7.0.3 prisma: - specifier: 5.2.0 - version: 5.2.0 + specifier: 5.3.0-integration-feat-js-connectors-in-client.13 + version: 5.3.0-integration-feat-js-connectors-in-client.13 tsx: specifier: ^3.12.7 version: 3.12.7 @@ -571,8 +577,8 @@ packages: engines: {node: '>=16'} dev: false - /@prisma/client@5.2.0(prisma@5.2.0): - resolution: {integrity: sha512-AiTjJwR4J5Rh6Z/9ZKrBBLel3/5DzUNntMohOy7yObVnVoTNVFi2kvpLZlFuKO50d7yDspOtW6XBpiAd0BVXbQ==} + /@prisma/client@5.3.0-integration-feat-js-connectors-in-client.13(prisma@5.3.0-integration-feat-js-connectors-in-client.13): + resolution: {integrity: sha512-jGfBWy89/dWm9crA4BxK6ET4oh49DZ1lpOrpRSAv3oGPm7hpISoUcW6GZ8dDhluIANQqSTA8NfIJyN6jSMm1DQ==} engines: {node: '>=16.13'} requiresBuild: true peerDependencies: @@ -581,16 +587,16 @@ packages: prisma: optional: true dependencies: - '@prisma/engines-version': 5.2.0-25.2804dc98259d2ea960602aca6b8e7fdc03c1758f - prisma: 5.2.0 + '@prisma/engines-version': 5.3.0-22.6473dadba8a7fff9689b35ad2ca9f9e5aff4d0d0 + prisma: 5.3.0-integration-feat-js-connectors-in-client.13 dev: false - /@prisma/engines-version@5.2.0-25.2804dc98259d2ea960602aca6b8e7fdc03c1758f: - resolution: {integrity: sha512-jsnKT5JIDIE01lAeCj2ghY9IwxkedhKNvxQeoyLs6dr4ZXynetD0vTy7u6wMJt8vVPv8I5DPy/I4CFaoXAgbtg==} + /@prisma/engines-version@5.3.0-22.6473dadba8a7fff9689b35ad2ca9f9e5aff4d0d0: + resolution: {integrity: sha512-Osd1JsYW04EyLalEJemXArlSrmVo/Lod0xndl5yvB0D/aw36M0+wjbJFZSlOn5BnN8FyM5/yIIJGsXlx+LxEMg==} dev: false - /@prisma/engines@5.2.0: - resolution: {integrity: sha512-dT7FOLUCdZmq+AunLqB1Iz+ZH/IIS1Fz2THmKZQ6aFONrQD/BQ5ecJ7g2wGS2OgyUFf4OaLam6/bxmgdOBDqig==} + /@prisma/engines@5.3.0-integration-feat-js-connectors-in-client.13: + resolution: {integrity: sha512-L9S4tBjlQvn92XNxK2W4ZMe3dN2kfKP/iWQ+h5TqYBHcOXFHBHCH2H8bLoN05h7+ba9tE7kBxPHLSCDkhxHayQ==} requiresBuild: true /@types/debug@4.1.8: @@ -710,6 +716,13 @@ packages: resolution: {integrity: sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==} dev: true + /copy-anything@3.0.5: + resolution: {integrity: sha512-yCEafptTtb4bk7GLEQoM8KVJpxAfdBJYaXyzQEgQQQgYrZiDp8SJmGKlYza6CYjEDNstAdNdKA3UuoULlEbS6w==} + engines: {node: '>=12.13'} + dependencies: + is-what: 4.1.15 + dev: false + /cross-env@7.0.3: resolution: {integrity: sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==} engines: {node: '>=10.14', npm: '>=6', yarn: '>=1'} @@ -947,6 +960,11 @@ packages: engines: {node: '>=8'} dev: true + /is-what@4.1.15: + resolution: {integrity: sha512-uKua1wfy3Yt+YqsD6mTUEa2zSi3G1oPlqTflgaPJ7z63vUGN5pxFpnQfeSLMFnJDEsdvOtkp1rUWkYjB4YfhgA==} + engines: {node: '>=12.13'} + dev: false + /isexe@2.0.0: resolution: {integrity: sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==} dev: true @@ -1220,13 +1238,13 @@ packages: resolution: {integrity: sha512-VdlZoocy5lCP0c/t66xAfclglEapXPCIVhqqJRncYpvbCgImF0w67aPKfbqUMr72tO2k5q0TdTZwCLjPTI6C9g==} dev: true - /prisma@5.2.0: - resolution: {integrity: sha512-FfFlpjVCkZwrqxDnP4smlNYSH1so+CbfjgdpioFzGGqlQAEm6VHAYSzV7jJgC3ebtY9dNOhDMS2+4/1DDSM7bQ==} + /prisma@5.3.0-integration-feat-js-connectors-in-client.13: + resolution: {integrity: sha512-X8s/HoMYv4WnK1w1Ce5ybplj5jWUmIUFBspoyGwpoJW3YUH7Z19f1/3UriSiQIZgnLI5PdpxWxTD42Z8pONUzQ==} engines: {node: '>=16.13'} hasBin: true requiresBuild: true dependencies: - '@prisma/engines': 5.2.0 + '@prisma/engines': 5.3.0-integration-feat-js-connectors-in-client.13 /punycode@2.3.0: resolution: {integrity: sha512-rRV+zQD8tVFys26lAGR9WUuS4iUAngJScM+ZRSKtvl5tKeZ2t5bvdNFdNHBW9FWR4guGHlgmsZ1G7BSm2wTbuA==} @@ -1336,6 +1354,13 @@ packages: ts-interface-checker: 0.1.13 dev: true + /superjson@1.13.1: + resolution: {integrity: sha512-AVH2eknm9DEd3qvxM4Sq+LTCkSXE2ssfh1t11MHMXyYXFQyQ1HLgVvV+guLTsaQnJU3gnaVo34TohHPulY/wLg==} + engines: {node: '>=10'} + dependencies: + copy-anything: 3.0.5 + dev: false + /thenify-all@1.6.0: resolution: {integrity: sha512-RNxQH/qI8/t3thXJDwcstUO4zeqo64+Uy/+sNVRBx4Xn2OX+OZ9oP+iJnNFqplFra2ZUVeKCSa2oVWi3T4uVmA==} engines: {node: '>=0.8'} diff --git a/query-engine/js-connectors/js/smoke-test-js/README.md b/query-engine/js-connectors/js/smoke-test-js/README.md index 2727ea1bfa2..62ec1d0439e 100644 --- a/query-engine/js-connectors/js/smoke-test-js/README.md +++ b/query-engine/js-connectors/js/smoke-test-js/README.md @@ -2,13 +2,13 @@ This is a playground for testing the `libquery` client with the experimental Node.js drivers. It contains a subset of `@prisma/client`, plus some handy executable smoke tests: -- [`./src/planetscale.ts`](./src/planetscale.ts) -- [`./src/neon.ts`](./src/neon.ts) +- [`./src/libquery`](./src/libquery): it contains smoke tests using a local `libquery`, the Query Engine library. +- [`./src/client`](./src/client): it contains smoke tests using `@prisma/client`. ## How to setup -We assume Node.js `v18.16.1`+ is installed. If not, run `nvm use` in the current directory. -This is very important to double-check if you have multiple versions installed, as PlanetScale requires either Node.js `v18.16.1`+ or a custom `fetch` function. +We assume Node.js `v20.5.1`+ is installed. If not, run `nvm use` in the current directory. +It's very important to double-check if you have multiple versions installed, as both PlanetScale and Neon requires either Node.js `v18`+ or a custom `fetch` function. - Create a `.envrc` starting from `.envrc.example`, and fill in the missing values following the given template - Install Node.js dependencies via @@ -24,7 +24,8 @@ This is very important to double-check if you have multiple versions installed, In the current directory: - Run `pnpm prisma:planetscale` to push the Prisma schema and insert the test data. -- Run `pnpm planetscale` to run smoke tests against the PlanetScale database. +- Run `pnpm planetscale` to run smoke tests using `libquery` against the PlanetScale database. +- Run `pnpm planetscale:client` to run smoke tests using `@prisma/client` against the PlanetScale database. Note: you used to be able to run these Prisma commands without changing the provider name, but [#4074](https://github.com/prisma/prisma-engines/pull/4074) changed that (see https://github.com/prisma/prisma-engines/pull/4074#issuecomment-1649942475). @@ -35,4 +36,5 @@ Note: you used to be able to run these Prisma commands without changing the prov In the current directory: - Run `pnpm prisma:neon` to push the Prisma schema and insert the test data. -- Run `pnpm neon` to run smoke tests against the Neon database. +- Run `pnpm neon` to run smoke tests using `libquery` against the Neon database. +- Run `pnpm neon:client` to run smoke tests using `@prisma/client` against the Neon database. diff --git a/query-engine/js-connectors/js/smoke-test-js/package.json b/query-engine/js-connectors/js/smoke-test-js/package.json index 6ef62bb2ede..cd46082f5bf 100644 --- a/query-engine/js-connectors/js/smoke-test-js/package.json +++ b/query-engine/js-connectors/js/smoke-test-js/package.json @@ -1,23 +1,23 @@ { "name": "@jkomyno/smoke-test-js", "private": true, + "type": "module", "version": "0.0.0", "description": "", "scripts": { "prisma:db:push:postgres": "prisma db push --schema ./prisma/postgres/schema.prisma --force-reset", "prisma:db:execute:postgres": "prisma db execute --schema ./prisma/postgres/schema.prisma --file ./prisma/postgres/commands/type_test/insert.sql", - "prisma:db:push:mysql": "prisma db push --schema ./prisma/mysql/schema.prisma --force-reset", "prisma:db:execute:mysql": "prisma db execute --schema ./prisma/mysql/schema.prisma --file ./prisma/mysql/commands/type_test/insert.sql", - "prisma:neon": "cross-env-shell DATABASE_URL=\"${JS_NEON_DATABASE_URL}\" \"pnpm prisma:db:push:postgres && pnpm prisma:db:execute:postgres\"", - "neon": "cross-env-shell DATABASE_URL=\"${JS_NEON_DATABASE_URL}\" \"tsx ./src/neon.ts\"", - + "neon": "cross-env-shell DATABASE_URL=\"${JS_NEON_DATABASE_URL}\" \"tsx ./src/libquery/neon.ts\"", + "neon:client": "DATABASE_URL=\"${JS_NEON_DATABASE_URL}\" node --test --loader=tsx ./src/client/neon.test.ts", "prisma:pg": "cross-env-shell DATABASE_URL=\"${JS_PG_DATABASE_URL}\" \"pnpm prisma:db:push:postgres && pnpm prisma:db:execute:postgres\"", - "pg": "cross-env-shell DATABASE_URL=\"${JS_PG_DATABASE_URL}\" \"tsx ./src/pg.ts\"", - + "pg": "cross-env-shell DATABASE_URL=\"${JS_PG_DATABASE_URL}\" \"tsx ./src/libquery/pg.ts\"", + "pg:client": "DATABASE_URL=\"${JS_PG_DATABASE_URL}\" node --test --loader=tsx ./src/client/pg.test.ts", "prisma:planetscale": "cross-env-shell DATABASE_URL=\"${JS_PLANETSCALE_DATABASE_URL}\" \"pnpm prisma:db:push:mysql && pnpm prisma:db:execute:mysql\"", - "planetscale": "cross-env-shell DATABASE_URL=\"${JS_PLANETSCALE_DATABASE_URL}\" \"tsx ./src/planetscale.ts\"" + "planetscale": "cross-env-shell DATABASE_URL=\"${JS_PLANETSCALE_DATABASE_URL}\" \"tsx ./src/libquery/planetscale.ts\"", + "planetscale:client": "DATABASE_URL=\"${JS_PLANETSCALE_DATABASE_URL}\" node --test --loader=tsx ./src/client/planetscale.test.ts" }, "keywords": [], "author": "Alberto Schiabel ", @@ -28,11 +28,13 @@ "@jkomyno/prisma-neon-js-connector": "workspace:*", "@jkomyno/prisma-pg-js-connector": "workspace:*", "@jkomyno/prisma-planetscale-js-connector": "workspace:*", - "@prisma/client": "5.2.0" + "@prisma/client": "5.3.0-integration-feat-js-connectors-in-client.13", + "superjson": "^1.13.1" }, "devDependencies": { + "@types/node": "^20.5.1", "cross-env": "^7.0.3", - "prisma": "5.2.0", + "prisma": "5.3.0-integration-feat-js-connectors-in-client.13", "tsx": "^3.12.7" } } diff --git a/query-engine/js-connectors/js/smoke-test-js/src/client/client.ts b/query-engine/js-connectors/js/smoke-test-js/src/client/client.ts new file mode 100644 index 00000000000..9f6b2188ed5 --- /dev/null +++ b/query-engine/js-connectors/js/smoke-test-js/src/client/client.ts @@ -0,0 +1,106 @@ +import { describe, it } from 'node:test' +import assert from 'node:assert' +import { PrismaClient } from '@prisma/client' +import { ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils' + +export async function smokeTestClient(connector: ErrorCapturingConnector) { + const provider = connector.flavour + + const log = [ + { + emit: 'event', + level: 'query', + } as const, + ] + + for (const jsConnector of [connector, undefined]) { + const isUsingJsConnector = jsConnector !== undefined + describe(isUsingJsConnector ? `using JS Connectors` : `using Rust drivers`, () => { + it('batch queries', async () => { + const prisma = new PrismaClient({ + jsConnector, + log, + }) + + const queries: string[] = [] + prisma.$on('query', ({ query }) => queries.push(query)) + + await prisma.$transaction([ + prisma.$queryRawUnsafe('SELECT 1'), + prisma.$queryRawUnsafe('SELECT 2'), + prisma.$queryRawUnsafe('SELECT 3'), + ]) + + const defaultExpectedQueries = [ + 'BEGIN', + 'SELECT 1', + 'SELECT 2', + 'SELECT 3', + 'COMMIT', + ] + + const jsConnectorExpectedQueries = [ + '-- Implicit "BEGIN" query via underlying driver', + 'SELECT 1', + 'SELECT 2', + 'SELECT 3', + '-- Implicit "COMMIT" query via underlying driver', + ] + + const postgresExpectedQueries = [ + 'BEGIN', + 'DEALLOCATE ALL', + 'SELECT 1', + 'SELECT 2', + 'SELECT 3', + 'COMMIT', + ] + + if (['mysql'].includes(provider)) { + if (isUsingJsConnector) { + assert.deepEqual(queries, jsConnectorExpectedQueries) + } else { + assert.deepEqual(queries, defaultExpectedQueries) + } + } else if (['postgres'].includes(provider)) { + if (isUsingJsConnector) { + assert.deepEqual(queries, defaultExpectedQueries) + } else { + assert.deepEqual(queries, postgresExpectedQueries) + } + } + }) + + it('applies isolation level when using batch $transaction', async () => { + const prisma = new PrismaClient({ + jsConnector, + log, + }) + + const queries: string[] = [] + prisma.$on('query', ({ query }) => queries.push(query)) + + await prisma.$transaction([ + prisma.child.findMany(), + prisma.child.count(), + ], { + isolationLevel: 'ReadCommitted', + }) + + if (['mysql'].includes(provider)) { + assert.deepEqual(queries.slice(0, 2), [ + 'SET TRANSACTION ISOLATION LEVEL READ COMMITTED', + 'BEGIN', + ]) + } else if (['postgres'].includes(provider)) { + assert.deepEqual(queries.slice(0, 2), [ + 'BEGIN', + 'SET TRANSACTION ISOLATION LEVEL READ COMMITTED', + ]) + } + + assert.deepEqual(queries.at(-1), 'COMMIT') + }) + }) + } +} diff --git a/query-engine/js-connectors/js/smoke-test-js/src/client/neon.test.ts b/query-engine/js-connectors/js/smoke-test-js/src/client/neon.test.ts new file mode 100644 index 00000000000..3cf18d21026 --- /dev/null +++ b/query-engine/js-connectors/js/smoke-test-js/src/client/neon.test.ts @@ -0,0 +1,13 @@ +import { createNeonConnector } from '@jkomyno/prisma-neon-js-connector' +import { describe } from 'node:test' +import { smokeTestClient } from './client' + +describe('neon with @prisma/client', async () => { + const connectionString = `${process.env.JS_NEON_DATABASE_URL as string}` + + const jsConnector = createNeonConnector({ + url: connectionString, + }) + + smokeTestClient(jsConnector) +}) diff --git a/query-engine/js-connectors/js/smoke-test-js/src/client/pg.test.ts b/query-engine/js-connectors/js/smoke-test-js/src/client/pg.test.ts new file mode 100644 index 00000000000..61ca30f9831 --- /dev/null +++ b/query-engine/js-connectors/js/smoke-test-js/src/client/pg.test.ts @@ -0,0 +1,13 @@ +import { createPgConnector } from '@jkomyno/prisma-pg-js-connector' +import { describe } from 'node:test' +import { smokeTestClient } from './client' + +describe('pg with @prisma/client', async () => { + const connectionString = `${process.env.JS_PG_DATABASE_URL as string}` + + const jsConnector = createPgConnector({ + url: connectionString, + }) + + smokeTestClient(jsConnector) +}) diff --git a/query-engine/js-connectors/js/smoke-test-js/src/client/planetscale.test.ts b/query-engine/js-connectors/js/smoke-test-js/src/client/planetscale.test.ts new file mode 100644 index 00000000000..ad8fd293906 --- /dev/null +++ b/query-engine/js-connectors/js/smoke-test-js/src/client/planetscale.test.ts @@ -0,0 +1,13 @@ +import { createPlanetScaleConnector } from '@jkomyno/prisma-planetscale-js-connector' +import { describe } from 'node:test' +import { smokeTestClient } from './client' + +describe('planetscale with @prisma/client', async () => { + const connectionString = `${process.env.JS_PLANETSCALE_DATABASE_URL as string}` + + const jsConnector = createPlanetScaleConnector({ + url: connectionString, + }) + + smokeTestClient(jsConnector) +}) diff --git a/query-engine/js-connectors/js/smoke-test-js/src/engines/types/Library.ts b/query-engine/js-connectors/js/smoke-test-js/src/engines/types/Library.ts index 063eb863f19..6b40040fc87 100644 --- a/query-engine/js-connectors/js/smoke-test-js/src/engines/types/Library.ts +++ b/query-engine/js-connectors/js/smoke-test-js/src/engines/types/Library.ts @@ -1,4 +1,4 @@ -import type { Connector } from '@jkomyno/prisma-js-connector-utils' +import type { ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils' import type { QueryEngineConfig } from './QueryEngine' export type QueryEngineInstance = { @@ -18,7 +18,7 @@ export type QueryEngineInstance = { } export interface QueryEngineConstructor { - new(config: QueryEngineConfig, logger: (log: string) => void, nodejsFnCtx?: Connector): QueryEngineInstance + new(config: QueryEngineConfig, logger: (log: string) => void, nodejsFnCtx?: ErrorCapturingConnector): QueryEngineInstance } export interface LibraryLoader { diff --git a/query-engine/js-connectors/js/smoke-test-js/src/test.ts b/query-engine/js-connectors/js/smoke-test-js/src/libquery/libquery.ts similarity index 97% rename from query-engine/js-connectors/js/smoke-test-js/src/test.ts rename to query-engine/js-connectors/js/smoke-test-js/src/libquery/libquery.ts index 9495f31a985..e3a3c2d6bd8 100644 --- a/query-engine/js-connectors/js/smoke-test-js/src/test.ts +++ b/query-engine/js-connectors/js/smoke-test-js/src/libquery/libquery.ts @@ -1,13 +1,10 @@ -import { setImmediate, setTimeout } from 'node:timers/promises' +import { setTimeout } from 'node:timers/promises' import type { ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils' -import type { QueryEngineInstance } from './engines/types/Library' +import type { QueryEngineInstance } from '../engines/types/Library' import { initQueryEngine } from './util' -import { JsonQuery } from './engines/types/JsonProtocol' +import { JsonQuery } from '../engines/types/JsonProtocol' -export async function smokeTest(db: ErrorCapturingConnector, prismaSchemaRelativePath: string) { - // wait for the database pool to be initialized - await setImmediate(0) - +export async function smokeTestLibquery(db: ErrorCapturingConnector, prismaSchemaRelativePath: string) { const engine = initQueryEngine(db, prismaSchemaRelativePath) console.log('[nodejs] connecting...') diff --git a/query-engine/js-connectors/js/smoke-test-js/src/neon.ts b/query-engine/js-connectors/js/smoke-test-js/src/libquery/neon.ts similarity index 73% rename from query-engine/js-connectors/js/smoke-test-js/src/neon.ts rename to query-engine/js-connectors/js/smoke-test-js/src/libquery/neon.ts index 01902cd1ecb..7fb153dde6e 100644 --- a/query-engine/js-connectors/js/smoke-test-js/src/neon.ts +++ b/query-engine/js-connectors/js/smoke-test-js/src/libquery/neon.ts @@ -1,5 +1,5 @@ import { createNeonConnector } from '@jkomyno/prisma-neon-js-connector' -import { smokeTest } from './test' +import { smokeTestLibquery } from './libquery' async function neon() { const connectionString = `${process.env.JS_NEON_DATABASE_URL as string}` @@ -9,7 +9,7 @@ async function neon() { httpMode: false, }) - await smokeTest(db, '../prisma/postgres/schema.prisma') + await smokeTestLibquery(db, '../../prisma/postgres/schema.prisma') } neon().catch((e) => { diff --git a/query-engine/js-connectors/js/smoke-test-js/src/pg.ts b/query-engine/js-connectors/js/smoke-test-js/src/libquery/pg.ts similarity index 71% rename from query-engine/js-connectors/js/smoke-test-js/src/pg.ts rename to query-engine/js-connectors/js/smoke-test-js/src/libquery/pg.ts index 8da2379ec0e..e1b8d924dcc 100644 --- a/query-engine/js-connectors/js/smoke-test-js/src/pg.ts +++ b/query-engine/js-connectors/js/smoke-test-js/src/libquery/pg.ts @@ -1,5 +1,5 @@ import { createPgConnector } from '@jkomyno/prisma-pg-js-connector' -import { smokeTest } from './test' +import { smokeTestLibquery } from './libquery' async function pg() { const connectionString = `${process.env.JS_PG_DATABASE_URL as string}` @@ -8,7 +8,7 @@ async function pg() { url: connectionString, }) - await smokeTest(db, '../prisma/postgres/schema.prisma') + await smokeTestLibquery(db, '../../prisma/postgres/schema.prisma') } pg().catch((e) => { diff --git a/query-engine/js-connectors/js/smoke-test-js/src/planetscale.ts b/query-engine/js-connectors/js/smoke-test-js/src/libquery/planetscale.ts similarity index 75% rename from query-engine/js-connectors/js/smoke-test-js/src/planetscale.ts rename to query-engine/js-connectors/js/smoke-test-js/src/libquery/planetscale.ts index 54ce4071e50..e75f1fcead9 100644 --- a/query-engine/js-connectors/js/smoke-test-js/src/planetscale.ts +++ b/query-engine/js-connectors/js/smoke-test-js/src/libquery/planetscale.ts @@ -1,5 +1,5 @@ import { createPlanetScaleConnector } from '@jkomyno/prisma-planetscale-js-connector' -import { smokeTest } from './test' +import { smokeTestLibquery } from './libquery' async function planetscale() { const connectionString = `${process.env.JS_PLANETSCALE_DATABASE_URL as string}` @@ -8,7 +8,7 @@ async function planetscale() { url: connectionString, }) - await smokeTest(db, '../prisma/mysql/schema.prisma') + await smokeTestLibquery(db, '../../prisma/mysql/schema.prisma') } planetscale().catch((e) => { diff --git a/query-engine/js-connectors/js/smoke-test-js/src/util.ts b/query-engine/js-connectors/js/smoke-test-js/src/libquery/util.ts similarity index 76% rename from query-engine/js-connectors/js/smoke-test-js/src/util.ts rename to query-engine/js-connectors/js/smoke-test-js/src/libquery/util.ts index c852bcd8590..c3a5b66b3fe 100644 --- a/query-engine/js-connectors/js/smoke-test-js/src/util.ts +++ b/query-engine/js-connectors/js/smoke-test-js/src/libquery/util.ts @@ -1,15 +1,15 @@ import path from 'node:path' import os from 'node:os' import fs from 'node:fs' -import type { Connector } from '@jkomyno/prisma-js-connector-utils' -import { Library, QueryEngineInstance } from './engines/types/Library' +import type { ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils' +import { Library, QueryEngineInstance } from '../engines/types/Library' -export function initQueryEngine(driver: Connector, prismaSchemaRelativePath: string): QueryEngineInstance { +export function initQueryEngine(driver: ErrorCapturingConnector, prismaSchemaRelativePath: string): QueryEngineInstance { // I assume nobody will run this on Windows ¯\_(ツ)_/¯ const libExt = os.platform() === 'darwin' ? 'dylib' : 'so' const dirname = path.dirname(new URL(import.meta.url).pathname) - const libQueryEnginePath = path.join(dirname, `../../../../../target/debug/libquery_engine.${libExt}`) + const libQueryEnginePath = path.join(dirname, `../../../../../../target/debug/libquery_engine.${libExt}`) const schemaPath = path.join(dirname, prismaSchemaRelativePath) console.log('[nodejs] read Prisma schema from', schemaPath) @@ -33,6 +33,7 @@ export function initQueryEngine(driver: Connector, prismaSchemaRelativePath: str const logCallback = (...args) => { console.log(args) } + const engine = new QueryEngine(queryEngineOptions, logCallback, driver) return engine diff --git a/query-engine/js-connectors/src/proxy.rs b/query-engine/js-connectors/src/proxy.rs index 7b7c67ca697..1e2e4817235 100644 --- a/query-engine/js-connectors/src/proxy.rs +++ b/query-engine/js-connectors/src/proxy.rs @@ -7,7 +7,7 @@ use crate::transaction::JsTransaction; use napi::bindgen_prelude::{FromNapiValue, ToNapiValue}; use napi::{JsObject, JsString}; use napi_derive::napi; -use quaint::connector::{IsolationLevel, ResultSet as QuaintResultSet}; +use quaint::connector::ResultSet as QuaintResultSet; use quaint::Value as QuaintValue; // TODO(jkomyno): import these 3rd-party crates from the `quaint-core` crate. @@ -33,11 +33,14 @@ pub(crate) struct CommonProxy { /// This is a JS proxy for accessing the methods specific to top level /// JS driver objects pub(crate) struct DriverProxy { - start_transaction: AsyncJsFunction, JsTransaction>, + start_transaction: AsyncJsFunction<(), JsTransaction>, } /// This a JS proxy for accessing the methods, specific /// to JS transaction objects pub(crate) struct TransactionProxy { + /// transaction options + options: TransactionOptions, + /// commit transaction commit: AsyncJsFunction<(), ()>, @@ -332,24 +335,35 @@ impl DriverProxy { }) } - pub async fn start_transaction( - &self, - isolation_level: Option, - ) -> quaint::Result> { - let tx = self - .start_transaction - .call(isolation_level.map(|l| l.to_string())) - .await?; + pub async fn start_transaction(&self) -> quaint::Result> { + let tx = self.start_transaction.call(()).await?; Ok(Box::new(tx)) } } +#[derive(Debug)] +#[napi(object)] +pub struct TransactionOptions { + /// Whether or not to run a phantom query (i.e., a query that only influences Prisma event logs, but not the database itself) + /// before opening a transaction, committing, or rollbacking. + pub use_phantom_query: bool, +} + impl TransactionProxy { pub fn new(js_transaction: &JsObject) -> napi::Result { let commit = js_transaction.get_named_property("commit")?; let rollback = js_transaction.get_named_property("rollback")?; + let options: TransactionOptions = js_transaction.get_named_property("options")?; + + Ok(Self { + commit, + rollback, + options, + }) + } - Ok(Self { commit, rollback }) + pub fn options(&self) -> &TransactionOptions { + &self.options } pub async fn commit(&self) -> quaint::Result<()> { diff --git a/query-engine/js-connectors/src/queryable.rs b/query-engine/js-connectors/src/queryable.rs index ba68498e827..2a1d6755f68 100644 --- a/query-engine/js-connectors/src/queryable.rs +++ b/query-engine/js-connectors/src/queryable.rs @@ -125,6 +125,10 @@ impl QuaintQueryable for JsBaseQueryable { } impl JsBaseQueryable { + pub fn phantom_query_message(stmt: &str) -> String { + format!(r#"-- Implicit "{}" query via underlying driver"#, stmt) + } + async fn build_query(sql: &str, values: &[quaint::Value<'_>]) -> quaint::Result { let sql: String = sql.to_string(); let args = conversion::conv_params(values)?; @@ -239,7 +243,33 @@ impl TransactionCapable for JsQueryable { &'a self, isolation: Option, ) -> quaint::Result> { - let tx = self.driver_proxy.start_transaction(isolation).await?; + let tx = self.driver_proxy.start_transaction().await?; + + let isolation_first = tx.requires_isolation_first(); + + if isolation_first { + if let Some(isolation) = isolation { + tx.set_tx_isolation_level(isolation).await?; + } + } + + let begin_stmt = tx.begin_statement(); + + let tx_opts = tx.options(); + if tx_opts.use_phantom_query { + let begin_stmt = JsBaseQueryable::phantom_query_message(begin_stmt); + tx.raw_phantom_cmd(begin_stmt.as_str()).await?; + } else { + tx.raw_cmd(begin_stmt).await?; + } + + if !isolation_first { + if let Some(isolation) = isolation { + tx.set_tx_isolation_level(isolation).await?; + } + } + + self.server_reset_query(tx.as_ref()).await?; Ok(tx) } diff --git a/query-engine/js-connectors/src/transaction.rs b/query-engine/js-connectors/src/transaction.rs index df1745d12e7..0d26c7f863a 100644 --- a/query-engine/js-connectors/src/transaction.rs +++ b/query-engine/js-connectors/src/transaction.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use metrics::{decrement_gauge, increment_gauge}; use napi::{bindgen_prelude::FromNapiValue, JsObject}; use quaint::{ connector::{IsolationLevel, Transaction as QuaintTransaction}, @@ -7,7 +8,7 @@ use quaint::{ }; use crate::{ - proxy::{CommonProxy, TransactionProxy}, + proxy::{CommonProxy, TransactionOptions, TransactionProxy}, queryable::JsBaseQueryable, }; @@ -21,17 +22,50 @@ pub(crate) struct JsTransaction { impl JsTransaction { pub(crate) fn new(inner: JsBaseQueryable, tx_proxy: TransactionProxy) -> Self { + increment_gauge!("prisma_client_queries_active", 1.0); + Self { inner, tx_proxy } } + + pub fn options(&self) -> &TransactionOptions { + self.tx_proxy.options() + } + + pub async fn raw_phantom_cmd(&self, cmd: &str) -> quaint::Result<()> { + let params = &[]; + quaint::connector::metrics::query("js.raw_phantom_cmd", cmd, params, move || async move { Ok(()) }).await + } } #[async_trait] impl QuaintTransaction for JsTransaction { async fn commit(&self) -> quaint::Result<()> { + decrement_gauge!("prisma_client_queries_active", 1.0); + + let commit_stmt = "COMMIT"; + + if self.options().use_phantom_query { + let commit_stmt = JsBaseQueryable::phantom_query_message(commit_stmt); + self.raw_phantom_cmd(commit_stmt.as_str()).await?; + } else { + self.inner.raw_cmd(commit_stmt).await?; + } + self.tx_proxy.commit().await } async fn rollback(&self) -> quaint::Result<()> { + decrement_gauge!("prisma_client_queries_active", 1.0); + + let rollback_stmt = "ROLLBACK"; + + if self.options().use_phantom_query { + let rollback_stmt = JsBaseQueryable::phantom_query_message(rollback_stmt); + self.raw_phantom_cmd(rollback_stmt.as_str()).await?; + } else { + self.inner.raw_cmd(rollback_stmt).await?; + } + self.tx_proxy.rollback().await }