Skip to content

Commit

Permalink
fix(js-connectors): enable batched/implicit transaction event logs an…
Browse files Browse the repository at this point in the history
…d metrics (#4197)

* fix(js-connectors): [PlanetScale] enable event logs in transactions

* chore(js-connectors): [Neon / Pg] add skeleton for event logs in transactions

* chore(js-connectors): add "@prisma/client" smoke tests, fix LibraryEngine types

* feat(js-connectors): add "@prisma/client" mini test suite

* fix(js-connectors): [Neon / pg] transaction event logs

* chore(js-connectors): simplify transaction options

* chore(js-connectors): simplify startTransaction signature

* chore(js-connectors): turn smoke-test-js into an ESModule

* feat(js-connectors): address PR comments
  • Loading branch information
jkomyno authored Sep 5, 2023
1 parent 1c434be commit 74f592b
Show file tree
Hide file tree
Showing 24 changed files with 399 additions and 138 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions query-engine/js-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ quaint.workspace = true
psl.workspace = true
tracing = "0.1"
tracing-core = "0.1"
metrics = "0.18"

# Note: these deps are temporarily specified here to avoid importing them from tiberius (the SQL server driver).
# They will be imported from quaint-core instead in a future PR.
Expand Down
2 changes: 1 addition & 1 deletion query-engine/js-connectors/js/.nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v18.16.1
v20.5.1
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ export const bindConnector = (connector: Connector): ErrorCapturingConnector =>
const bindTransaction = (errorRegistry: ErrorRegistryInternal, transaction: Transaction): Transaction => {
return ({
flavour: transaction.flavour,
options: transaction.options,
queryRaw: wrapAsync(errorRegistry, transaction.queryRaw.bind(transaction)),
executeRaw: wrapAsync(errorRegistry, transaction.executeRaw.bind(transaction)),
commit: wrapAsync(errorRegistry, transaction.commit.bind(transaction)),
rollback: wrapAsync(errorRegistry, transaction.rollback.bind(transaction))
rollback: wrapAsync(errorRegistry, transaction.rollback.bind(transaction)),
});
}

Expand Down
15 changes: 11 additions & 4 deletions query-engine/js-connectors/js/js-connector-utils/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,27 @@ export interface Queryable {

export interface Connector extends Queryable {
/**
* Starts new transation with the specified isolation level
* @param isolationLevel
* Starts new transation.
*/
startTransaction(isolationLevel?: string): Promise<Result<Transaction>>
startTransaction(): Promise<Result<Transaction>>

/**
* Closes the connection to the database, if any.
*/
close: () => Promise<Result<void>>
}

export type TransactionOptions = {
usePhantomQuery: boolean
}

export interface Transaction extends Queryable {
/**
* Commit the transaction
* Transaction options.
*/
readonly options: TransactionOptions
/**
* Commit the transaction.
*/
commit(): Promise<Result<void>>
/**
Expand Down
47 changes: 25 additions & 22 deletions query-engine/js-connectors/js/neon-js-connector/src/neon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { FullQueryResults, PoolClient, neon, neonConfig } from '@neondatabase/se
import { NeonConfig, NeonQueryFunction, Pool, QueryResult } from '@neondatabase/serverless'
import ws from 'ws'
import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils'
import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils'
import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector, TransactionOptions } from '@jkomyno/prisma-js-connector-utils'
import { fieldToColumnType } from './conversion'

neonConfig.webSocketConstructor = ws
Expand Down Expand Up @@ -43,7 +43,9 @@ abstract class NeonQueryable implements Queryable {
debug(`${tag} %O`, query)

const { rowCount: rowsAffected } = await this.performIO(query)
return { ok: true, value: rowsAffected }

// Note: `rowsAffected` can sometimes be null (e.g., when executing `"BEGIN"`)
return { ok: true, value: rowsAffected ?? 0 }
}

abstract performIO(query: Query): Promise<PerformIOResult>
Expand Down Expand Up @@ -71,24 +73,23 @@ class NeonWsQueryable<ClientT extends Pool|PoolClient> extends NeonQueryable {
}

class NeonTransaction extends NeonWsQueryable<PoolClient> implements Transaction {
constructor(client: PoolClient, readonly options: TransactionOptions) {
super(client)
}

async commit(): Promise<Result<void>> {
try {
await this.client.query('COMMIT');
return { ok: true, value: undefined }
} finally {
this.client.release()
}
debug(`[js::commit]`)

this.client.release()
return Promise.resolve({ ok: true, value: undefined })
}

async rollback(): Promise<Result<void>> {
try {
await this.client.query('ROLLBACK');
return { ok: true, value: undefined }
} finally {
this.client.release()
}
}
debug(`[js::rollback]`)

this.client.release()
return Promise.resolve({ ok: true, value: undefined })
}
}

class NeonWsConnector extends NeonWsQueryable<Pool> implements Connector {
Expand All @@ -98,14 +99,16 @@ class NeonWsConnector extends NeonWsQueryable<Pool> implements Connector {
super(new Pool({ connectionString, ...rest }))
}

async startTransaction(isolationLevel?: string | undefined): Promise<Result<Transaction>> {
const connection = await this.client.connect()
await connection.query('BEGIN')
if (isolationLevel) {
await connection.query(`SET TRANSACTION ISOLATION LEVEL ${isolationLevel}`)
async startTransaction(): Promise<Result<Transaction>> {
const options: TransactionOptions = {
usePhantomQuery: false,
}

return { ok: true, value: new NeonTransaction(connection) }

const tag = '[js::startTransaction]'
debug(`${tag} options: %O`, options)

const connection = await this.client.connect()
return { ok: true, value: new NeonTransaction(connection, options) }
}

async close() {
Expand Down
49 changes: 20 additions & 29 deletions query-engine/js-connectors/js/pg-js-connector/src/pg.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as pg from 'pg'
import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils'
import type { ErrorCapturingConnector, Connector, ConnectorConfig, Query, Queryable, Result, ResultSet, Transaction } from '@jkomyno/prisma-js-connector-utils'
import type { ErrorCapturingConnector, Connector, ConnectorConfig, Query, Queryable, Result, ResultSet, Transaction, TransactionOptions } from '@jkomyno/prisma-js-connector-utils'
import { fieldToColumnType } from './conversion'

const debug = Debug('prisma:js-connector:pg')
Expand Down Expand Up @@ -45,8 +45,10 @@ class PgQueryable<ClientT extends StdClient | TransactionClient>
const tag = '[js::execute_raw]'
debug(`${tag} %O`, query)

const { rowCount } = await this.performIO(query)
return { ok: true, value: rowCount }
const { rowCount: rowsAffected } = await this.performIO(query)

// Note: `rowsAffected` can sometimes be null (e.g., when executing `"BEGIN"`)
return { ok: true, value: rowsAffected ?? 0 }
}

/**
Expand All @@ -70,32 +72,22 @@ class PgQueryable<ClientT extends StdClient | TransactionClient>

class PgTransaction extends PgQueryable<TransactionClient>
implements Transaction {
constructor(client: pg.PoolClient) {
constructor(client: pg.PoolClient, readonly options: TransactionOptions) {
super(client)
}

async commit(): Promise<Result<void>> {
const tag = '[js::commit]'
debug(`${tag} committing transaction`)
debug(`[js::commit]`)

try {
await this.client.query('COMMIT')
return { ok: true, value: undefined }
} finally {
this.client.release()
}
this.client.release()
return Promise.resolve({ ok: true, value: undefined })
}

async rollback(): Promise<Result<void>> {
const tag = '[js::rollback]'
debug(`${tag} rolling back the transaction`)
debug(`[js::rollback]`)

try {
await this.client.query('ROLLBACK')
return { ok: true, value: undefined }
} finally {
this.client.release()
}
this.client.release()
return Promise.resolve({ ok: true, value: undefined })
}
}

Expand All @@ -110,17 +102,16 @@ class PrismaPg extends PgQueryable<StdClient> implements Connector {
super(client)
}

async startTransaction(isolationLevel?: string): Promise<Result<Transaction>> {
const connection = await this.client.connect()
await connection.query('BEGIN')

if (isolationLevel) {
await connection.query(
`SET TRANSACTION ISOLATION LEVEL ${isolationLevel}`,
)
async startTransaction(): Promise<Result<Transaction>> {
const options: TransactionOptions = {
usePhantomQuery: false,
}

return { ok: true, value: new PgTransaction(connection) }
const tag = '[js::startTransaction]'
debug(`${tag} options: %O`, options)

const connection = await this.client.connect()
return { ok: true, value: new PgTransaction(connection, options) }
}

async close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as planetScale from '@planetscale/database'
import type { Config as PlanetScaleConfig } from '@planetscale/database'
import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils'
import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils'
import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector, TransactionOptions } from '@jkomyno/prisma-js-connector-utils'
import { type PlanetScaleColumnType, fieldToColumnType } from './conversion'
import { createDeferred, Deferred } from './deferred'

Expand Down Expand Up @@ -79,22 +79,27 @@ class PlanetScaleQueryable<ClientT extends planetScale.Connection | planetScale.
}

class PlanetScaleTransaction extends PlanetScaleQueryable<planetScale.Transaction> implements Transaction {
constructor(tx: planetScale.Transaction, private txDeferred: Deferred<void>, private txResultPromise: Promise<void>) {
constructor(
tx: planetScale.Transaction,
readonly options: TransactionOptions,
private txDeferred: Deferred<void>,
private txResultPromise: Promise<void>,
) {
super(tx)
}

async commit(): Promise<Result<void>> {
const tag = '[js::commit]'
debug(`${tag} committing transaction`)
debug(`[js::commit]`)

this.txDeferred.resolve()
return { ok: true, value: await this.txResultPromise };
return Promise.resolve({ ok: true, value: await this.txResultPromise })
}

async rollback(): Promise<Result<void>> {
const tag = '[js::rollback]'
debug(`${tag} rolling back the transaction`)
debug(`[js::rollback]`)

this.txDeferred.reject(new RollbackError())
return { ok: true, value: await this.txResultPromise };
return Promise.resolve({ ok: true, value: await this.txResultPromise })
}

}
Expand All @@ -104,30 +109,32 @@ class PrismaPlanetScale extends PlanetScaleQueryable<planetScale.Connection> imp
const client = planetScale.connect(config)

super(client)

}

async startTransaction(isolationLevel?: string) {
return new Promise<Result<Transaction>>((resolve) => {
async startTransaction() {
const options: TransactionOptions = {
usePhantomQuery: true,
}

const tag = '[js::startTransaction]'
debug(`${tag} options: %O`, options)

return new Promise<Result<Transaction>>((resolve, reject) => {
const txResultPromise = this.client.transaction(async tx => {
if (isolationLevel) {
await tx.execute(`SET TRANSACTION ISOLATION LEVEL ${isolationLevel}`)
}
const [txDeferred, deferredPromise] = createDeferred<void>()
const txWrapper = new PlanetScaleTransaction(tx, txDeferred, txResultPromise)

resolve({ ok: true, value: txWrapper });
const txWrapper = new PlanetScaleTransaction(tx, options, txDeferred, txResultPromise)

resolve({ ok: true, value: txWrapper })
return deferredPromise
}).catch(error => {
// Rollback error is ignored (so that tx.rollback() won't crash)
// any other error is legit and is re-thrown
if (!(error instanceof RollbackError)) {
return Promise.reject(error)
return reject(error)
}

return undefined
});
})
})
}

Expand Down
Loading

0 comments on commit 74f592b

Please sign in to comment.