diff --git a/package.json b/package.json index 221f7f8d0..48892484a 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "dependencies": { "@google-cloud/promisify": "^4.0.0", "arrify": "^2.0.1", + "async-mutex": "^0.4.0", "concat-stream": "^2.0.0", "extend": "^3.0.2", "google-gax": "^4.0.5", diff --git a/src/request.ts b/src/request.ts index d39f572ef..047cf0ab6 100644 --- a/src/request.ts +++ b/src/request.ts @@ -79,7 +79,7 @@ const CONSISTENCY_PROTO_CODE: ConsistencyProtoCode = { * @class */ class DatastoreRequest { - id: string | undefined; + id: string | undefined | Uint8Array | null; requests_: | Entity | { @@ -546,6 +546,19 @@ class DatastoreRequest { ); } + /** + * Datastore allows you to run aggregate queries by supplying aggregate fields + * which will determine the type of aggregation that is performed. + * + * The query is run, and the results are returned in the second argument of + * the callback provided. + * + * @param {AggregateQuery} query AggregateQuery object. + * @param {RunQueryOptions} options Optional configuration + * @param {function} [callback] The callback function. If omitted, a promise is + * returned. + * + **/ runAggregationQuery( query: AggregateQuery, options?: RunQueryOptions @@ -1157,7 +1170,7 @@ export interface SharedQueryOptions { partitionId?: google.datastore.v1.IPartitionId | null; readOptions?: { readConsistency?: number; - transaction?: string; + transaction?: string | Uint8Array | null; readTime?: ITimestamp; }; } @@ -1166,9 +1179,9 @@ export interface RequestOptions extends SharedQueryOptions { keys?: Entity; transactionOptions?: { readOnly?: {}; - readWrite?: {previousTransaction?: string}; + readWrite?: {previousTransaction?: string | Uint8Array | null}; } | null; - transaction?: string | null; + transaction?: string | null | Uint8Array; mode?: string; query?: QueryProto; filter?: string; diff --git a/src/transaction.ts b/src/transaction.ts index 2ce153e07..0b7883b6d 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -22,26 +22,40 @@ import {google} from '../protos/protos'; import {Datastore, TransactionOptions} from '.'; import {entity, Entity, Entities} from './entity'; -import {Query} from './query'; +import { + Query, + RunQueryCallback, + RunQueryInfo, + RunQueryOptions, + RunQueryResponse, +} from './query'; import { CommitCallback, CommitResponse, DatastoreRequest, RequestOptions, PrepareEntityObjectResponse, + CreateReadStreamOptions, + GetResponse, + GetCallback, + RequestCallback, } from './request'; import {AggregateQuery} from './aggregate'; +import {Mutex} from 'async-mutex'; -// RequestPromiseReturnType should line up with the types in RequestCallback -interface RequestPromiseReturnType { +/* + * This type matches the value returned by the promise in the + * #beginTransactionAsync function and subsequently passed into various other + * methods in this class. + */ +interface BeginAsyncResponse { err?: Error | null; - resp: any; // TODO: Replace with google.datastore.v1.IBeginTransactionResponse and address downstream issues + resp?: google.datastore.v1.IBeginTransactionResponse; } -interface RequestResolveFunction { - (callbackData: RequestPromiseReturnType): void; -} -interface RequestAsPromiseCallback { - (resolve: RequestResolveFunction): void; + +enum TransactionState { + NOT_STARTED, + IN_PROGRESS, // IN_PROGRESS currently tracks the expired state as well } /** @@ -70,6 +84,8 @@ class Transaction extends DatastoreRequest { request: Function; modifiedEntities_: ModifiedEntities; skipCommit?: boolean; + #mutex = new Mutex(); + #state = TransactionState.NOT_STARTED; constructor(datastore: Datastore, options?: TransactionOptions) { super(); /** @@ -161,7 +177,14 @@ class Transaction extends DatastoreRequest { : () => {}; const gaxOptions = typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; - this.#runCommit(gaxOptions, callback); + // This ensures that the transaction is started before calling runCommit + this.#withBeginTransaction( + gaxOptions, + () => { + this.#runCommit(gaxOptions, callback); + }, + callback + ); } /** @@ -300,6 +323,47 @@ class Transaction extends DatastoreRequest { }); } + /** + * This function calls get on the super class. If the transaction + * has not been started yet then the transaction is started before the + * get call is made. + * + * @param {Key|Key[]} keys Datastore key object(s). + * @param {object} [options] Optional configuration. + * @param {function} callback The callback function. + * + */ + get( + keys: entity.Key | entity.Key[], + options?: CreateReadStreamOptions + ): Promise<GetResponse>; + get(keys: entity.Key | entity.Key[], callback: GetCallback): void; + get( + keys: entity.Key | entity.Key[], + options: CreateReadStreamOptions, + callback: GetCallback + ): void; + get( + keys: entity.Key | entity.Key[], + optionsOrCallback?: CreateReadStreamOptions | GetCallback, + cb?: GetCallback + ): void | Promise<GetResponse> { + const options = + typeof optionsOrCallback === 'object' && optionsOrCallback + ? optionsOrCallback + : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + // This ensures that the transaction is started before calling get + this.#withBeginTransaction( + options.gaxOptions, + () => { + super.get(keys, options, callback); + }, + callback + ); + } + /** * Maps to {@link https://cloud.google.com/nodejs/docs/reference/datastore/latest/datastore/transaction#_google_cloud_datastore_Transaction_save_member_1_|Datastore#save}, forcing the method to be `insert`. * @@ -446,8 +510,16 @@ class Transaction extends DatastoreRequest { typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - this.#runAsync(options).then((response: RequestPromiseReturnType) => { - this.#processBeginResults(response, callback); + this.#mutex.runExclusive(async () => { + if (this.#state === TransactionState.NOT_STARTED) { + const runResults = await this.#beginTransactionAsync(options); + this.#processBeginResults(runResults, callback); + } else { + process.emitWarning( + 'run has already been called and should not be called again.' + ); + callback(null, this, {transaction: this.id}); + } }); } @@ -579,26 +651,39 @@ class Transaction extends DatastoreRequest { /** * This function parses results from a beginTransaction call * - * @param {RequestPromiseReturnType} response The response from a call to - * begin a transaction. - * @param {RunCallback} callback A callback that accepts an error and a + * @param {BeginAsyncResponse} [response] + * The response data from a call to begin a transaction. + * @param {RunCallback} [callback] A callback that accepts an error and a * response as arguments. * **/ #processBeginResults( - response: RequestPromiseReturnType, + runResults: BeginAsyncResponse, callback: RunCallback ): void { - const err = response.err; - const resp = response.resp; + const err = runResults.err; + const resp = runResults.resp; if (err) { callback(err, null, resp); } else { - this.id = resp!.transaction; + this.#parseRunSuccess(runResults); callback(null, this, resp); } } + /** + * This function saves results from a successful beginTransaction call. + * + * @param {BeginAsyncResponse} [response] The response from a call to + * begin a transaction that completed successfully. + * + **/ + #parseRunSuccess(runResults: BeginAsyncResponse) { + const resp = runResults.resp; + this.id = resp!.transaction; + this.#state = TransactionState.IN_PROGRESS; + } + /** * This async function makes a beginTransaction call and returns a promise with * the information returned from the call that was made. @@ -608,7 +693,9 @@ class Transaction extends DatastoreRequest { * * **/ - async #runAsync(options: RunOptions): Promise<RequestPromiseReturnType> { + async #beginTransactionAsync( + options: RunOptions + ): Promise<BeginAsyncResponse> { const reqOpts: RequestOptions = { transactionOptions: {}, }; @@ -626,9 +713,7 @@ class Transaction extends DatastoreRequest { if (options.transactionOptions) { reqOpts.transactionOptions = options.transactionOptions; } - const promiseFunction: RequestAsPromiseCallback = ( - resolve: RequestResolveFunction - ) => { + return new Promise((resolve: (value: BeginAsyncResponse) => void) => { this.request_( { client: 'DatastoreClient', @@ -644,8 +729,89 @@ class Transaction extends DatastoreRequest { }); } ); - }; - return new Promise(promiseFunction); + }); + } + + /** + * + * This function calls runAggregationQuery on the super class. If the transaction + * has not been started yet then the transaction is started before the + * runAggregationQuery call is made. + * + * @param {AggregateQuery} [query] AggregateQuery object. + * @param {RunQueryOptions} [options] Optional configuration + * @param {function} [callback] The callback function. If omitted, a promise is + * returned. + * + **/ + runAggregationQuery( + query: AggregateQuery, + options?: RunQueryOptions + ): Promise<RunQueryResponse>; + runAggregationQuery( + query: AggregateQuery, + options: RunQueryOptions, + callback: RequestCallback + ): void; + runAggregationQuery(query: AggregateQuery, callback: RequestCallback): void; + runAggregationQuery( + query: AggregateQuery, + optionsOrCallback?: RunQueryOptions | RequestCallback, + cb?: RequestCallback + ): void | Promise<RunQueryResponse> { + const options = + typeof optionsOrCallback === 'object' && optionsOrCallback + ? optionsOrCallback + : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + // This ensures that the transaction is started before calling runAggregationQuery + this.#withBeginTransaction( + options.gaxOptions, + () => { + super.runAggregationQuery(query, options, callback); + }, + callback + ); + } + + /** + * This function calls runQuery on the super class. If the transaction + * has not been started yet then the transaction is started before the + * runQuery call is made. + * + * @param {Query} query Query object. + * @param {object} [options] Optional configuration. + * @param {function} [callback] The callback function. If omitted, a readable + * stream instance is returned. + * + */ + runQuery(query: Query, options?: RunQueryOptions): Promise<RunQueryResponse>; + runQuery( + query: Query, + options: RunQueryOptions, + callback: RunQueryCallback + ): void; + runQuery(query: Query, callback: RunQueryCallback): void; + runQuery( + query: Query, + optionsOrCallback?: RunQueryOptions | RunQueryCallback, + cb?: RunQueryCallback + ): void | Promise<RunQueryResponse> { + const options = + typeof optionsOrCallback === 'object' && optionsOrCallback + ? optionsOrCallback + : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + // This ensures that the transaction is started before calling runQuery + this.#withBeginTransaction( + options.gaxOptions, + () => { + super.runQuery(query, options, callback); + }, + callback + ); } /** @@ -838,6 +1004,59 @@ class Transaction extends DatastoreRequest { this.save(entities); } + + /** + * Some rpc calls require that the transaction has been started (i.e, has a + * valid id) before they can be sent. #withBeginTransaction acts as a wrapper + * over those functions. + * + * If the transaction has not begun yet, `#withBeginTransaction` will first + * send an rpc to begin the transaction, and then execute the wrapped + * function. If it has begun, the wrapped function will be called directly + * instead. If an error is encountered during the beginTransaction call, the + * callback will be executed instead of the wrapped function. + * + * @param {CallOptions | undefined} [gaxOptions] Gax options provided by the + * user that are used for the beginTransaction grpc call. + * @param {function} [fn] A function which is run after ensuring a + * beginTransaction call is made. + * @param {function} [callback] A callback provided by the user that expects + * an error in the first argument and a custom data type for the rest of the + * arguments. + * @private + */ + #withBeginTransaction<T extends any[]>( + gaxOptions: CallOptions | undefined, + fn: () => void, + callback: (...args: [Error | null, ...T] | [Error | null]) => void + ): void { + (async () => { + if (this.#state === TransactionState.NOT_STARTED) { + try { + await this.#mutex.runExclusive(async () => { + if (this.#state === TransactionState.NOT_STARTED) { + // This sends an rpc call to get the transaction id + const runResults = await this.#beginTransactionAsync({ + gaxOptions, + }); + if (runResults.err) { + // The rpc getting the id was unsuccessful. + // Do not call the wrapped function. + throw runResults.err; + } + this.#parseRunSuccess(runResults); + // The rpc saving the transaction id was successful. + // Now the wrapped function fn will be called. + } + }); + } catch (err: any) { + // Handle an error produced by the beginTransactionAsync call + return callback(err); + } + } + return fn(); + })(); + } } export type ModifiedEntities = Array<{ diff --git a/system-test/datastore.ts b/system-test/datastore.ts index 2d1e6bdc1..52f036644 100644 --- a/system-test/datastore.ts +++ b/system-test/datastore.ts @@ -1749,6 +1749,187 @@ async.each( assert.deepStrictEqual(results, [{property_1: 4}]); }); }); + describe('transactions with and without run', () => { + describe('lookup, put, commit', () => { + const key = datastore.key(['Company', 'Google']); + const obj = { + url: 'www.google.com', + }; + afterEach(async () => { + await datastore.delete(key); + }); + async function doLookupPutCommit(transaction: Transaction) { + const [firstRead] = await transaction.get(key); + assert(!firstRead); + transaction.save({key, data: obj}); + await transaction.commit(); + const [entity] = await datastore.get(key); + delete entity[datastore.KEY]; + assert.deepStrictEqual(entity, obj); + } + it('should run in a transaction', async () => { + const transaction = datastore.transaction(); + await transaction.run(); + await doLookupPutCommit(transaction); + }); + it('should run in a transaction without run', async () => { + const transaction = datastore.transaction(); + await doLookupPutCommit(transaction); + }); + }); + describe('put, lookup, commit', () => { + const key = datastore.key(['Company', 'Google']); + const obj = { + url: 'www.google.com', + }; + afterEach(async () => { + await datastore.delete(key); + }); + async function doPutLookupCommit(transaction: Transaction) { + transaction.save({key, data: obj}); + const [firstRead] = await transaction.get(key); + assert(!firstRead); + await transaction.commit(); + const [entity] = await datastore.get(key); + delete entity[datastore.KEY]; + assert.deepStrictEqual(entity, obj); + } + it('should run in a transaction', async () => { + const transaction = datastore.transaction(); + await transaction.run(); + await doPutLookupCommit(transaction); + }); + it('should run in a transaction without run', async () => { + const transaction = datastore.transaction(); + await doPutLookupCommit(transaction); + }); + }); + describe('runQuery, put, commit', () => { + const key = datastore.key(['Company', 'Google']); + const obj = { + url: 'www.google.com', + }; + afterEach(async () => { + await datastore.delete(key); + }); + async function doRunQueryPutCommit(transaction: Transaction) { + const query = transaction.createQuery('Company'); + const [results] = await transaction.runQuery(query); + assert.deepStrictEqual(results, []); + transaction.save({key, data: obj}); + await transaction.commit(); + const [entity] = await datastore.get(key); + delete entity[datastore.KEY]; + assert.deepStrictEqual(entity, obj); + } + it('should run in a transaction', async () => { + const transaction = datastore.transaction(); + await transaction.run(); + await doRunQueryPutCommit(transaction); + }); + it('should run in a transaction without run', async () => { + const transaction = datastore.transaction(); + await doRunQueryPutCommit(transaction); + }); + }); + describe('put, runQuery, commit', () => { + const key = datastore.key(['Company', 'Google']); + const obj = { + url: 'www.google.com', + }; + afterEach(async () => { + await datastore.delete(key); + }); + async function doPutRunQueryCommit(transaction: Transaction) { + transaction.save({key, data: obj}); + const query = transaction.createQuery('Company'); + const [results] = await transaction.runQuery(query); + assert.deepStrictEqual(results, []); + await transaction.commit(); + const [entity] = await datastore.get(key); + delete entity[datastore.KEY]; + assert.deepStrictEqual(entity, obj); + } + it('should run in a transaction', async () => { + const transaction = datastore.transaction(); + await transaction.run(); + await doPutRunQueryCommit(transaction); + }); + it('should run in a transaction without run', async () => { + const transaction = datastore.transaction(); + await doPutRunQueryCommit(transaction); + }); + }); + + describe('runAggregationQuery, put, commit', () => { + const key = datastore.key(['Company', 'Google']); + const obj = { + url: 'www.google.com', + }; + afterEach(async () => { + await datastore.delete(key); + }); + async function doRunAggregationQueryPutCommit( + transaction: Transaction + ) { + const query = transaction.createQuery('Company'); + const aggregateQuery = transaction + .createAggregationQuery(query) + .count('total'); + const [results] = + await transaction.runAggregationQuery(aggregateQuery); + assert.deepStrictEqual(results, [{total: 0}]); + transaction.save({key, data: obj}); + await transaction.commit(); + const [entity] = await datastore.get(key); + delete entity[datastore.KEY]; + assert.deepStrictEqual(entity, obj); + } + it('should run in a transaction', async () => { + const transaction = datastore.transaction(); + await transaction.run(); + await doRunAggregationQueryPutCommit(transaction); + }); + it('should run in a transaction without run', async () => { + const transaction = datastore.transaction(); + await doRunAggregationQueryPutCommit(transaction); + }); + }); + describe('put, runAggregationQuery, commit', () => { + const key = datastore.key(['Company', 'Google']); + const obj = { + url: 'www.google.com', + }; + afterEach(async () => { + await datastore.delete(key); + }); + async function doPutRunAggregationQueryCommit( + transaction: Transaction + ) { + transaction.save({key, data: obj}); + const query = transaction.createQuery('Company'); + const aggregateQuery = transaction + .createAggregationQuery(query) + .count('total'); + const [results] = + await transaction.runAggregationQuery(aggregateQuery); + assert.deepStrictEqual(results, [{total: 0}]); + await transaction.commit(); + const [entity] = await datastore.get(key); + delete entity[datastore.KEY]; + assert.deepStrictEqual(entity, obj); + } + it('should run in a transaction', async () => { + const transaction = datastore.transaction(); + await transaction.run(); + await doPutRunAggregationQueryCommit(transaction); + }); + it('should run in a transaction without run', async () => { + const transaction = datastore.transaction(); + await doPutRunAggregationQueryCommit(transaction); + }); + }); + }); describe('transactions', () => { it('should run in a transaction', async () => { const key = datastore.key(['Company', 'Google']); @@ -1873,9 +2054,7 @@ async.each( [result] = await aggregateQuery.run(); } catch (e) { await transaction.rollback(); - assert.fail( - 'The aggregation query run should have been successful' - ); + throw e; } assert.deepStrictEqual(result, [{total: 2}]); await transaction.commit(); @@ -1892,9 +2071,7 @@ async.each( [result] = await aggregateQuery.run(); } catch (e) { await transaction.rollback(); - assert.fail( - 'The aggregation query run should have been successful' - ); + throw e; } assert.deepStrictEqual(result, [{'total rating': 200}]); await transaction.commit(); @@ -1911,9 +2088,7 @@ async.each( [result] = await aggregateQuery.run(); } catch (e) { await transaction.rollback(); - assert.fail( - 'The aggregation query run should have been successful' - ); + throw e; } assert.deepStrictEqual(result, [{'average rating': 100}]); await transaction.commit(); @@ -1929,9 +2104,7 @@ async.each( [result] = await aggregateQuery.run(); } catch (e) { await transaction.rollback(); - assert.fail( - 'The aggregation query run should have been successful' - ); + throw e; } return result; } diff --git a/test/transaction.ts b/test/transaction.ts index 78bc0254c..c1346f3df 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -21,19 +21,19 @@ import * as proxyquire from 'proxyquire'; import { Datastore, DatastoreOptions, - DatastoreClient, DatastoreRequest, Query, TransactionOptions, Transaction, AggregateField, } from '../src'; -import {Entity, entity} from '../src/entity'; +import {Entities, Entity, entity} from '../src/entity'; import * as tsTypes from '../src/transaction'; import * as sinon from 'sinon'; import {Callback, CallOptions, ClientStub} from 'google-gax'; import { CommitCallback, + CreateReadStreamOptions, GetCallback, RequestCallback, RequestConfig, @@ -43,7 +43,8 @@ import {google} from '../protos/protos'; import {RunCallback} from '../src/transaction'; import * as protos from '../protos/protos'; import {AggregateQuery} from '../src/aggregate'; -import {RunQueryCallback} from '../src/query'; +import {RunQueryCallback, RunQueryInfo, RunQueryOptions} from '../src/query'; +import * as mocha from 'mocha'; const async = require('async'); // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -160,11 +161,24 @@ async.each( }); describe('testing various transaction functions when transaction.run returns a response', () => { + type RequestType = + | protos.google.datastore.v1.ICommitRequest + | protos.google.datastore.v1.IBeginTransactionRequest + | protos.google.datastore.v1.ILookupRequest + | protos.google.datastore.v1.IRunQueryRequest + | protos.google.datastore.v1.IRunAggregationQueryRequest; // These tests were created to ensure that various transaction functions work correctly after run is called. // This allows us to catch any breaking changes to code usages that should remain the same. const testRunResp = { transaction: Buffer.from(Array.from(Array(100).keys())), }; + enum GapicFunctionName { + BEGIN_TRANSACTION = 'beginTransaction', + LOOKUP = 'lookup', + RUN_QUERY = 'runQuery', + RUN_AGGREGATION_QUERY = 'runAggregationQuery', + COMMIT = 'commit', + } // MockedTransactionWrapper is a helper class for mocking out various // Gapic functions and ensuring that responses and errors actually make it @@ -174,12 +188,21 @@ async.each( transaction: Transaction; dataClient?: ClientStub; mockedBeginTransaction: Function; - functionsMocked: {name: string; mockedFunction: Function}[]; + functionsMocked: { + name: GapicFunctionName; + mockedFunction: Function; + }[]; // The callBackSignaler lets the user of this object get a signal when the mocked function is called. // This is useful for tests that need to know when the mocked function is called. - callBackSignaler: (callbackReached: string) => void = () => {}; - - constructor() { + callBackSignaler: ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => void = () => {}; + + constructor( + err: Error | null = null, + resp: google.datastore.v1.IBeginTransactionResponse = testRunResp + ) { const namespace = 'run-without-mock'; const projectId = 'project-id'; const options = { @@ -196,7 +219,7 @@ async.each( const gapic = Object.freeze({ v1: require('../src/v1'), }); - // Datastore Gapic clients haven't been initialized yet so we initialize them here. + // Datastore Gapic clients haven't been initialized yet, so we initialize them here. datastore.clients_.set( dataClientName, new gapic.v1[dataClientName](options) @@ -221,8 +244,11 @@ async.each( ) => { // Calls a user provided function that will receive this string // Usually used to track when this code was reached relative to other code - this.callBackSignaler('beginTransaction called'); - callback(null, testRunResp); + this.callBackSignaler( + GapicFunctionName.BEGIN_TRANSACTION, + request + ); + callback(err, resp); }; } this.dataClient = dataClient; @@ -233,7 +259,7 @@ async.each( // This mocks out a gapic function to just call the callback received in the Gapic function. // The callback will send back the error and response arguments provided as parameters. mockGapicFunction<ResponseType>( - functionName: string, + functionName: GapicFunctionName, response: ResponseType, error: Error | null ) { @@ -253,17 +279,15 @@ async.each( } if (dataClient && dataClient[functionName]) { dataClient[functionName] = ( - request: any, // RequestType + request: RequestType, options: CallOptions, callback: Callback< ResponseType, - | any // RequestType - | null - | undefined, + RequestType | null | undefined, {} | null | undefined > ) => { - this.callBackSignaler(`${functionName} called`); + this.callBackSignaler(functionName, request); callback(error, response); }; } @@ -278,7 +302,7 @@ async.each( } // This resets Gapic functions mocked out by the tests to what they originally were. - // Resetting mocked out Gapic functions ensures other tests don't use these mocks. + // Resetting mocked out Gapic functions ensures other tests don't use these mocked out functions. resetGapicFunctions() { this.functionsMocked.forEach(functionMocked => { if (this.dataClient) { @@ -292,6 +316,45 @@ async.each( let transactionWrapper: MockedTransactionWrapper; let transaction: Transaction; + afterEach(() => { + transactionWrapper.resetBeginTransaction(); + transactionWrapper.resetGapicFunctions(); + }); + + describe('sending an error back from the beginTransaction gapic function', () => { + const testErrorMessage = 'test-beginTransaction-error'; + beforeEach(async () => { + transactionWrapper = new MockedTransactionWrapper( + new Error(testErrorMessage), + undefined + ); + }); + it('should send back the error when awaiting a promise', async () => { + try { + await transactionWrapper.transaction.commit(); + assert.fail('The run call should have failed.'); + } catch (error: any) { + assert.strictEqual(error['message'], testErrorMessage); + } + }); + it('should send back the error when using a callback', done => { + const commitCallback: CommitCallback = ( + error: Error | null | undefined, + response?: google.datastore.v1.ICommitResponse + ) => { + try { + assert(error); + assert.strictEqual(error.message, testErrorMessage); + assert.deepStrictEqual(response, undefined); + done(); + } catch (e) { + done(e); + } + }; + transactionWrapper.transaction.commit(commitCallback); + }); + }); + describe('commit', () => { // These tests were created to catch regressions for transaction.commit changes. const testCommitResp = { @@ -313,15 +376,10 @@ async.each( transactionWrapper = new MockedTransactionWrapper(); }); - afterEach(() => { - transactionWrapper.resetBeginTransaction(); - transactionWrapper.resetGapicFunctions(); - }); - describe('should pass error back to the user', async () => { beforeEach(() => { transactionWrapper.mockGapicFunction( - 'commit', + GapicFunctionName.COMMIT, testCommitResp, new Error(testErrorMessage) ); @@ -358,7 +416,7 @@ async.each( describe('should pass response back to the user', async () => { beforeEach(() => { transactionWrapper.mockGapicFunction( - 'commit', + GapicFunctionName.COMMIT, testCommitResp, null ); @@ -425,15 +483,10 @@ async.each( .addAggregation(AggregateField.average('appearances')); }); - afterEach(() => { - transactionWrapper.resetBeginTransaction(); - transactionWrapper.resetGapicFunctions(); - }); - describe('should pass error back to the user', async () => { beforeEach(() => { transactionWrapper.mockGapicFunction( - 'runAggregationQuery', + GapicFunctionName.RUN_AGGREGATION_QUERY, runAggregationQueryResp, new Error(testErrorMessage) ); @@ -451,7 +504,7 @@ async.each( it('should send back the error when using a callback', done => { const runAggregateQueryCallback: RequestCallback = ( error: Error | null | undefined, - response?: any + response?: unknown ) => { try { assert(error); @@ -473,7 +526,7 @@ async.each( describe('should pass response back to the user', async () => { beforeEach(() => { transactionWrapper.mockGapicFunction( - 'runAggregationQuery', + GapicFunctionName.RUN_AGGREGATION_QUERY, runAggregationQueryResp, null ); @@ -491,7 +544,7 @@ async.each( it('should send back the response when using a callback', done => { const runAggregateQueryCallback: CommitCallback = ( error: Error | null | undefined, - response?: any + response?: unknown ) => { try { assert.strictEqual(error, null); @@ -522,6 +575,10 @@ async.each( }, }; const runQueryUserResp: Entity[] = []; + const runQueryUserInfo: RunQueryInfo = { + moreResults: undefined, + endCursor: '[object Object]', + }; const testErrorMessage = 'test-run-Query-error'; let q: Query; @@ -531,15 +588,10 @@ async.each( q = transactionWrapper.datastore.createQuery('Character'); }); - afterEach(() => { - transactionWrapper.resetBeginTransaction(); - transactionWrapper.resetGapicFunctions(); - }); - describe('should pass error back to the user', async () => { beforeEach(() => { transactionWrapper.mockGapicFunction( - 'runQuery', + GapicFunctionName.RUN_QUERY, runQueryResp, new Error(testErrorMessage) ); @@ -557,12 +609,14 @@ async.each( it('should send back the error when using a callback', done => { const callback: RunQueryCallback = ( error: Error | null | undefined, - response?: any + entities?: Entity[], + info?: RunQueryInfo ) => { try { assert(error); assert.strictEqual(error.message, testErrorMessage); - assert.deepStrictEqual(response, undefined); + assert.deepStrictEqual(entities, undefined); + assert.deepStrictEqual(info, undefined); done(); } catch (e) { done(e); @@ -576,28 +630,27 @@ async.each( describe('should pass response back to the user', async () => { beforeEach(() => { transactionWrapper.mockGapicFunction( - 'runQuery', + GapicFunctionName.RUN_QUERY, runQueryResp, null ); }); it('should send back the response when awaiting a promise', async () => { await transaction.run(); - const allResults = await transaction.runQuery(q); - const [runAggregateQueryResults] = allResults; - assert.deepStrictEqual( - runAggregateQueryResults, - runQueryUserResp - ); + const [runQueryResults, info] = await transaction.runQuery(q); + assert.deepStrictEqual(runQueryResults, runQueryUserResp); + assert.deepStrictEqual(info, runQueryUserInfo); }); it('should send back the response when using a callback', done => { const callback: RunQueryCallback = ( error: Error | null | undefined, - response?: any + entities?: Entity[], + info?: RunQueryInfo ) => { try { assert.strictEqual(error, null); - assert.deepStrictEqual(response, runQueryUserResp); + assert.deepStrictEqual(entities, runQueryUserResp); + assert.deepStrictEqual(info, runQueryUserInfo); done(); } catch (e) { done(e); @@ -644,25 +697,18 @@ async.each( }; const getUserResp = 'post1'; const testErrorMessage = 'test-run-Query-error'; - let q: Query; let key: entity.Key; beforeEach(async () => { transactionWrapper = new MockedTransactionWrapper(); transaction = transactionWrapper.transaction; - q = transactionWrapper.datastore.createQuery('Character'); key = transactionWrapper.datastore.key(['Company', 'Google']); }); - afterEach(() => { - transactionWrapper.resetBeginTransaction(); - transactionWrapper.resetGapicFunctions(); - }); - describe('should pass error back to the user', async () => { beforeEach(() => { transactionWrapper.mockGapicFunction( - 'lookup', + GapicFunctionName.LOOKUP, getResp, new Error(testErrorMessage) ); @@ -679,13 +725,13 @@ async.each( }); it('should send back the error when using a callback', done => { const callback: GetCallback = ( - error: Error | null | undefined, - response?: any + err?: Error | null, + entity?: Entities ) => { try { - assert(error); - assert.strictEqual(error.message, testErrorMessage); - assert.deepStrictEqual(response, undefined); + assert(err); + assert.strictEqual(err.message, testErrorMessage); + assert.deepStrictEqual(entity, undefined); done(); } catch (e) { done(e); @@ -698,7 +744,11 @@ async.each( }); describe('should pass response back to the user', async () => { beforeEach(() => { - transactionWrapper.mockGapicFunction('lookup', getResp, null); + transactionWrapper.mockGapicFunction( + GapicFunctionName.LOOKUP, + getResp, + null + ); }); it('should send back the response when awaiting a promise', async () => { await transaction.run(); @@ -708,12 +758,12 @@ async.each( }); it('should send back the response when using a callback', done => { const callback: GetCallback = ( - error: Error | null | undefined, - response?: any + err?: Error | null, + entity?: Entities ) => { try { - const result = response[transactionWrapper.datastore.KEY]; - assert.strictEqual(error, null); + const result = entity[transactionWrapper.datastore.KEY]; + assert.strictEqual(err, null); assert.deepStrictEqual(result.name, getUserResp); done(); } catch (e) { @@ -726,6 +776,487 @@ async.each( }); }); }); + describe('concurrency', async () => { + // Items in this enum represent different points in time in the user code. + enum UserCodeEvent { + RUN_CALLBACK, + COMMIT_CALLBACK, + GET_CALLBACK, + RUN_QUERY_CALLBACK, + RUN_AGGREGATION_QUERY_CALLBACK, + CUSTOM_EVENT, + } + // A transaction event represents a point in time particular code is reached + // when running code that uses a transaction. + type TransactionEvent = GapicFunctionName | UserCodeEvent; + + // This object is a sample response from 'commit' in the Gapic layer. + const testCommitResp = { + mutationResults: [ + { + key: { + path: [ + { + kind: 'some-kind', + }, + ], + }, + }, + ], + }; + // This object is a sample response from 'lookup' in the Gapic layer. + const testLookupResp = { + found: [ + { + entity: { + key: { + path: [ + { + kind: 'Post', + name: 'post1', + idType: 'name', + }, + ], + partitionId: { + projectId: 'projectId', + databaseId: 'databaseId', + namespaceId: 'namespaceId', + }, + }, + excludeFromIndexes: false, + properties: {}, + }, + }, + ], + missing: [], + deferred: [], + transaction: testRunResp.transaction, + readTime: { + seconds: '1699470605', + nanos: 201398000, + }, + }; + // This object is a sample response from 'runQuery' in the Gapic layer. + const testRunQueryResp = { + batch: { + entityResults: [], + endCursor: { + type: 'Buffer', + data: Buffer.from(Array.from(Array(100).keys())), + }, + }, + }; + // This object is a sample response from 'runAggregationQuery' in the Gapic layer. + const testRunAggregationQueryResp = { + batch: { + aggregationResults: [ + { + aggregateProperties: { + 'average rating': { + meaning: 0, + excludeFromIndexes: false, + doubleValue: 100, + valueType: 'doubleValue', + }, + }, + }, + ], + moreResults: + google.datastore.v1.QueryResultBatch.MoreResultsType + .NO_MORE_RESULTS, + readTime: {seconds: '1699390681', nanos: 961667000}, + }, + query: null, + transaction: testRunResp.transaction, + }; + let transactionWrapper: MockedTransactionWrapper; + let transaction: Transaction; + + beforeEach(async () => { + transactionWrapper = new MockedTransactionWrapper(); + transaction = transactionWrapper.transaction; + }); + + afterEach(() => { + transactionWrapper.resetBeginTransaction(); + transactionWrapper.resetGapicFunctions(); + }); + + type GapicRequestData = { + call: GapicFunctionName; + request?: RequestType; + }; + + /** + * This object is used for testing the order that different events occur. + * The events can include user code reached, gapic code reached and callbacks called. + * + * @param {MockedTransactionWrapper} [transactionWrapper] A TransactionWrapper instance. + * @param {mocha.Done} [done] A function for signalling the test is complete. + * @param {TransactionEvent[]} [expectedOrder] The order events are expected to occur. + * @param {MockedTransactionWrapper} [transactionWrapper] A TransactionWrapper instance. + */ + class TransactionOrderTester { + /** + * expectedRequests equal the request data in the order they are expected to + * be passed into the Gapic layer. + * @private + */ + readonly #expectedRequests?: GapicRequestData[]; + /** + * requests are the actual order of the requests that are passed into the + * gapic layer + * @private + */ + readonly #requests: GapicRequestData[] = []; + /** + * expectedEventOrder is the order the test expects different events to occur + * such as a callback being called, Gapic functions being called or user + * code being run. + */ + readonly #expectedEventOrder: TransactionEvent[] = []; + /** + * eventOrder is the order events actually occur in the test and will be compared with + * expectedEventOrder. + * @private + */ + #eventOrder: TransactionEvent[] = []; + // A transaction wrapper object is used to contain the transaction and mocked Gapic functions. + #transactionWrapper: MockedTransactionWrapper; + // Stores the mocha done function so that it can be called from this object. + readonly #done: mocha.Done; + + /** + * Each time an event occurs this function is called to check to see if all + * events happened that were supposed to happen. If all events in the test + * happened then this function passes tests if the events happened in the + * right order. + */ + #checkForCompletion() { + if (this.#eventOrder.length >= this.#expectedEventOrder.length) { + try { + assert.deepStrictEqual( + this.#eventOrder, + this.#expectedEventOrder + ); + if (this.#expectedRequests) { + assert.deepStrictEqual( + this.#requests, + this.#expectedRequests + ); + } + this.#done(); + } catch (e) { + this.#done(e); + } + } + } + + constructor( + transactionWrapper: MockedTransactionWrapper, + done: mocha.Done, + expectedOrder: TransactionEvent[], + expectedRequests?: { + call: GapicFunctionName; + request?: RequestType; + }[] + ) { + this.#expectedEventOrder = expectedOrder; + this.#expectedRequests = expectedRequests; + this.#done = done; + transactionWrapper.callBackSignaler = ( + call: GapicFunctionName, + request?: RequestType + ) => { + try { + this.#requests.push({call, request}); + this.#eventOrder.push(call); + this.#checkForCompletion(); + } catch (e) { + done(e); + } + }; + this.#transactionWrapper = transactionWrapper; + } + + /** + * Returns a callback that will record an event so that order of events + * can be compared later. + * + * @param {UserCodeEvent} [event] The event that should be recorded. + */ + push(event: UserCodeEvent) { + return () => { + try { + this.#eventOrder.push(event); + this.#checkForCompletion(); + } catch (e) { + this.#done(e); + } + }; + } + } + + describe('should pass response back to the user', async () => { + beforeEach(() => { + transactionWrapper.mockGapicFunction( + GapicFunctionName.COMMIT, + testCommitResp, + null + ); + }); + + it('should call the callbacks in the proper order with run and commit', done => { + const tester = new TransactionOrderTester( + transactionWrapper, + done, + [ + UserCodeEvent.CUSTOM_EVENT, + GapicFunctionName.BEGIN_TRANSACTION, + UserCodeEvent.RUN_CALLBACK, + GapicFunctionName.COMMIT, + UserCodeEvent.COMMIT_CALLBACK, + ] + ); + transaction.run(tester.push(UserCodeEvent.RUN_CALLBACK)); + transaction.commit(tester.push(UserCodeEvent.COMMIT_CALLBACK)); + tester.push(UserCodeEvent.CUSTOM_EVENT)(); + }); + it('should call the callbacks in the proper order with commit', done => { + const tester = new TransactionOrderTester( + transactionWrapper, + done, + [ + UserCodeEvent.CUSTOM_EVENT, + GapicFunctionName.BEGIN_TRANSACTION, + GapicFunctionName.COMMIT, + UserCodeEvent.COMMIT_CALLBACK, + ] + ); + transaction.commit(tester.push(UserCodeEvent.COMMIT_CALLBACK)); + tester.push(UserCodeEvent.CUSTOM_EVENT)(); + }); + it('should call the callbacks in the proper order with two run calls', done => { + const tester = new TransactionOrderTester( + transactionWrapper, + done, + [ + UserCodeEvent.CUSTOM_EVENT, + GapicFunctionName.BEGIN_TRANSACTION, + UserCodeEvent.RUN_CALLBACK, + UserCodeEvent.RUN_CALLBACK, + ] + ); + transaction.run(tester.push(UserCodeEvent.RUN_CALLBACK)); + transaction.run(tester.push(UserCodeEvent.RUN_CALLBACK)); + tester.push(UserCodeEvent.CUSTOM_EVENT)(); + }); + it('should call the callbacks in the proper order with commit and then run', done => { + const tester = new TransactionOrderTester( + transactionWrapper, + done, + [ + UserCodeEvent.CUSTOM_EVENT, + GapicFunctionName.BEGIN_TRANSACTION, + UserCodeEvent.RUN_CALLBACK, + GapicFunctionName.COMMIT, + UserCodeEvent.COMMIT_CALLBACK, + ] + ); + transaction.commit(tester.push(UserCodeEvent.COMMIT_CALLBACK)); + transaction.run(tester.push(UserCodeEvent.RUN_CALLBACK)); + tester.push(UserCodeEvent.CUSTOM_EVENT)(); + }); + }); + describe('should pass response back to the user and check the request', async () => { + let key: entity.Key; + beforeEach(() => { + key = transactionWrapper.datastore.key(['Company', 'Google']); + transactionWrapper.mockGapicFunction( + GapicFunctionName.COMMIT, + testCommitResp, + null + ); + transactionWrapper.mockGapicFunction( + GapicFunctionName.LOOKUP, + testLookupResp, + null + ); + transactionWrapper.mockGapicFunction( + GapicFunctionName.RUN_QUERY, + testRunQueryResp, + null + ); + transactionWrapper.mockGapicFunction( + GapicFunctionName.RUN_AGGREGATION_QUERY, + testRunAggregationQueryResp, + null + ); + }); + const beginTransactionRequest = { + transactionOptions: {}, + projectId: 'project-id', + }; + const commitRequest = { + mode: 'TRANSACTIONAL', + transaction: testRunResp.transaction, + projectId: 'project-id', + mutations: [ + { + upsert: { + properties: {}, + key: { + partitionId: { + namespaceId: 'run-without-mock', + }, + path: [ + { + kind: 'Company', + name: 'Google', + }, + ], + }, + }, + }, + ], + }; + const lookupTransactionRequest = { + keys: [ + { + partitionId: { + namespaceId: 'run-without-mock', + }, + path: [ + { + kind: 'Company', + name: 'Google', + }, + ], + }, + ], + projectId: 'project-id', + readOptions: { + transaction: testRunResp.transaction, + }, + }; + describe('put, commit', () => { + const expectedRequests = [ + { + call: GapicFunctionName.BEGIN_TRANSACTION, + request: beginTransactionRequest, + }, + { + call: GapicFunctionName.COMMIT, + request: commitRequest, + }, + ]; + it('should verify that there is a BeginTransaction call while beginning later', done => { + const tester = new TransactionOrderTester( + transactionWrapper, + done, + [ + GapicFunctionName.BEGIN_TRANSACTION, + GapicFunctionName.COMMIT, + UserCodeEvent.COMMIT_CALLBACK, + ], + expectedRequests + ); + transaction.save({ + key, + data: '', + }); + transaction.commit(tester.push(UserCodeEvent.COMMIT_CALLBACK)); + }); + it('should verify that there is a BeginTransaction call while beginning early', done => { + const tester = new TransactionOrderTester( + transactionWrapper, + done, + [ + GapicFunctionName.BEGIN_TRANSACTION, + UserCodeEvent.RUN_CALLBACK, + GapicFunctionName.COMMIT, + UserCodeEvent.COMMIT_CALLBACK, + ], + expectedRequests + ); + transaction.save({ + key, + data: '', + }); + transaction.run(tester.push(UserCodeEvent.RUN_CALLBACK)); + transaction.commit(tester.push(UserCodeEvent.COMMIT_CALLBACK)); + }); + }); + describe('lookup, lookup, put, commit', () => { + const expectedRequests = [ + { + call: GapicFunctionName.BEGIN_TRANSACTION, + request: beginTransactionRequest, + }, + { + call: GapicFunctionName.COMMIT, + request: commitRequest, + }, + { + call: GapicFunctionName.LOOKUP, + request: lookupTransactionRequest, + }, + { + call: GapicFunctionName.LOOKUP, + request: lookupTransactionRequest, + }, + ]; + it('should verify that there is a BeginTransaction call while beginning later', done => { + const tester = new TransactionOrderTester( + transactionWrapper, + done, + [ + GapicFunctionName.BEGIN_TRANSACTION, + GapicFunctionName.COMMIT, + UserCodeEvent.COMMIT_CALLBACK, + GapicFunctionName.LOOKUP, + GapicFunctionName.LOOKUP, + UserCodeEvent.GET_CALLBACK, + UserCodeEvent.GET_CALLBACK, + ], + expectedRequests + ); + transaction.get(key, tester.push(UserCodeEvent.GET_CALLBACK)); + transaction.get(key, tester.push(UserCodeEvent.GET_CALLBACK)); + transactionWrapper.transaction.save({ + key, + data: '', + }); + transaction.commit(tester.push(UserCodeEvent.COMMIT_CALLBACK)); + }); + it('should verify that there is a BeginTransaction call while beginning early', done => { + const tester = new TransactionOrderTester( + transactionWrapper, + done, + [ + GapicFunctionName.BEGIN_TRANSACTION, + UserCodeEvent.RUN_CALLBACK, + GapicFunctionName.COMMIT, + UserCodeEvent.COMMIT_CALLBACK, + GapicFunctionName.LOOKUP, + GapicFunctionName.LOOKUP, + UserCodeEvent.GET_CALLBACK, + UserCodeEvent.GET_CALLBACK, + ], + expectedRequests + ); + transaction.run(tester.push(UserCodeEvent.RUN_CALLBACK)); + transaction.get(key, tester.push(UserCodeEvent.GET_CALLBACK)); + transaction.get(key, tester.push(UserCodeEvent.GET_CALLBACK)); + transactionWrapper.transaction.save({ + key, + data: '', + }); + transaction.commit(tester.push(UserCodeEvent.COMMIT_CALLBACK)); + }); + }); + }); + }); }); describe('run without setting up transaction id', () => { @@ -754,7 +1285,7 @@ async.each( const gapic = Object.freeze({ v1: require('../src/v1'), }); - // Datastore Gapic clients haven't been initialized yet so we initialize them here. + // Datastore Gapic clients haven't been initialized yet, so we initialize them here. datastore.clients_.set( dataClientName, new gapic.v1[dataClientName](options) @@ -857,8 +1388,18 @@ async.each( }); describe('commit', () => { - beforeEach(() => { + beforeEach(done => { transaction.id = TRANSACTION_ID; + transaction.request_ = (config, callback) => { + callback(null, { + transaction: Buffer.from(Array.from(Array(100).keys())), + }); + // Delay to give the transaction mutex the opportunity to unlock before running tests. + setImmediate(() => { + done(); + }); + }; + transaction.run(); }); afterEach(() => {