Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch job support #231

Merged
merged 9 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@nitric/sdk",
"description": "Nitric NodeJS client sdk",
"nitric": "v1.6.0",
"nitric": "v1.14.0",
"author": "Nitric <https://github.com/nitrictech>",
"repository": "https://github.com/nitrictech/node-sdk",
"main": "lib/index.js",
Expand Down
28 changes: 28 additions & 0 deletions src/api/batch/v1/batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021, Nitric Technologies Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { SERVICE_BIND } from '@nitric/sdk/constants';
import { BatchClient } from '@nitric/sdk/gen/nitric/proto/batch/v1/batch_grpc_pb';
import * as grpc from '@grpc/grpc-js';

let batchClient: BatchClient;

export const getBatchClient = (): BatchClient => {
if (!batchClient) {
batchClient = new BatchClient(
SERVICE_BIND,
grpc.ChannelCredentials.createInsecure()
);
}
return batchClient;
};
14 changes: 14 additions & 0 deletions src/api/batch/v1/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2021, Nitric Technologies Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
export * from './job';
75 changes: 75 additions & 0 deletions src/api/batch/v1/job.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021, Nitric Technologies Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Job } from './job';
import { UnimplementedError } from '../../errors';
import { status } from '@grpc/grpc-js';
import { getBatchClient } from './batch';
import { JobSubmitResponse } from '@nitric/sdk/gen/nitric/proto/batch/v1/batch_pb';
import { BatchClient } from '@nitric/sdk/gen/nitric/proto/batch/v1/batch_grpc_pb';

describe('Job Client Tests', () => {
describe('Given the grpc client returns an unimplemented error status', () => {
const MOCK_ERROR = {
code: status.UNIMPLEMENTED,
message: 'UNIMPLEMENTED',
};
let submitMock;
beforeAll(() => {
submitMock = jest
.spyOn(BatchClient.prototype, 'submitJob')
.mockImplementation((request, callback: any) => {
callback(MOCK_ERROR, null);
return null as any;
});
});
afterAll(() => {
jest.resetAllMocks();
});
test('Then submit call should return an UnimplementedError', async () => {
const job = new Job('test', getBatchClient());
await expect(
job.submit({
test: 'test',
})
).rejects.toBeInstanceOf(UnimplementedError);
});
test('The Grpc client for Job.submit should have been called exactly once', () => {
expect(submitMock).toHaveBeenCalledTimes(1);
});
});
describe('Given the grpc returns successfully', () => {
let submitMock;
beforeAll(() => {
submitMock = jest
.spyOn(BatchClient.prototype, 'submitJob')
.mockImplementation((request, callback: any) => {
const response = new JobSubmitResponse();
callback(null, response);
return null as any;
});
});
afterAll(() => {
jest.resetAllMocks();
});
test('Then Eventing.submit should resolve with the provided id', async () => {
const client = new Job('test', getBatchClient());
await expect(
client.submit({ message: 'Test Payload' })
).resolves.toBeUndefined();
});
test('The Grpc client for Eventing.submit should have been called exactly once', () => {
expect(submitMock).toHaveBeenCalledTimes(1);
});
});
});
61 changes: 61 additions & 0 deletions src/api/batch/v1/job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2021, Nitric Technologies Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { BatchClient } from '@nitric/proto/batch/v1/batch_grpc_pb';
import { JobData, JobSubmitRequest } from '@nitric/proto/batch/v1/batch_pb';
import { Struct } from 'google-protobuf/google/protobuf/struct_pb';
import { fromGrpcError } from '../../errors';

export class Job<T extends Record<string, any> = Record<string, any>> {
private name: string;
private client: BatchClient;

constructor(name: string, client: BatchClient) {
this.name = name;
this.client = client;
}

/**
* Submit a job to the batch service
*
* @example
* ```typescript
* const analyse = job('analyse').allow('submit');
*
* await analyse.submit({
* data: 'some data',
* });
* ```
*
* @param data - Data to submit to the job
* @returns Promise that resolves when the job has been submitted
*/
async submit(data: T): Promise<void> {
const request = new JobSubmitRequest();
const jobData = new JobData();

jobData.setStruct(Struct.fromJavaScript(data));
request.setJobName(this.name);
request.setData(jobData);

return new Promise<void>((resolve, reject) => {
this.client.submitJob(request, (error, _response) => {
if (error) {
reject(fromGrpcError(error));
} else {
resolve();
}
});
});
}
}
1 change: 1 addition & 0 deletions src/context/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ export * from './http';
export * from './interval';
export * from './message';
export * from './websocket';
export * from './job';
60 changes: 60 additions & 0 deletions src/context/job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2021, Nitric Technologies Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {
JobRequest as JobRequestPb,
JobResponse as JobResponsePb,
} from '../gen/nitric/proto/batch/v1/batch_pb';
import { AbstractRequest, BaseContext } from './base';

export interface JobResponse {
success: boolean;
}

export class JobRequest extends AbstractRequest {
public readonly jobName: string;

constructor(data: string | Uint8Array, jobName: string) {
super(data);
this.jobName = jobName;
}
}

export class JobContext extends BaseContext<JobRequest, JobResponse> {
public get job(): JobContext {
return this;
}

static fromJobRequest(jobRequest: JobRequestPb): JobContext {
const ctx = new JobContext();
const jobName = jobRequest.getJobName();

const data = jobRequest.getData().getStruct().toJavaScript();

ctx.request = new JobRequest(JSON.stringify(data), jobName);

ctx.response = {
success: true,
};

return ctx;
}

static toJobResponse(ctx: JobContext): JobResponsePb {
const evtCtx = ctx.job;
const jobResponse = new JobResponsePb();
jobResponse.setSuccess(evtCtx.res.success);

return jobResponse;
}
}
40 changes: 40 additions & 0 deletions src/gen/nitric/proto/batch/v1/batch_grpc_pb.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// GENERATED CODE -- DO NOT EDIT!

// package: nitric.proto.batch.v1
// file: nitric/proto/batch/v1/batch.proto

import * as nitric_proto_batch_v1_batch_pb from "../../../../nitric/proto/batch/v1/batch_pb";
import * as grpc from "@grpc/grpc-js";

interface IJobService extends grpc.ServiceDefinition<grpc.UntypedServiceImplementation> {
handleJob: grpc.MethodDefinition<nitric_proto_batch_v1_batch_pb.ClientMessage, nitric_proto_batch_v1_batch_pb.ServerMessage>;
}

export const JobService: IJobService;

export interface IJobServer extends grpc.UntypedServiceImplementation {
handleJob: grpc.handleBidiStreamingCall<nitric_proto_batch_v1_batch_pb.ClientMessage, nitric_proto_batch_v1_batch_pb.ServerMessage>;
}

export class JobClient extends grpc.Client {
constructor(address: string, credentials: grpc.ChannelCredentials, options?: object);
handleJob(metadataOrOptions?: grpc.Metadata | grpc.CallOptions | null): grpc.ClientDuplexStream<nitric_proto_batch_v1_batch_pb.ClientMessage, nitric_proto_batch_v1_batch_pb.ServerMessage>;
handleJob(metadata?: grpc.Metadata | null, options?: grpc.CallOptions | null): grpc.ClientDuplexStream<nitric_proto_batch_v1_batch_pb.ClientMessage, nitric_proto_batch_v1_batch_pb.ServerMessage>;
}

interface IBatchService extends grpc.ServiceDefinition<grpc.UntypedServiceImplementation> {
submitJob: grpc.MethodDefinition<nitric_proto_batch_v1_batch_pb.JobSubmitRequest, nitric_proto_batch_v1_batch_pb.JobSubmitResponse>;
}

export const BatchService: IBatchService;

export interface IBatchServer extends grpc.UntypedServiceImplementation {
submitJob: grpc.handleUnaryCall<nitric_proto_batch_v1_batch_pb.JobSubmitRequest, nitric_proto_batch_v1_batch_pb.JobSubmitResponse>;
}

export class BatchClient extends grpc.Client {
constructor(address: string, credentials: grpc.ChannelCredentials, options?: object);
submitJob(argument: nitric_proto_batch_v1_batch_pb.JobSubmitRequest, callback: grpc.requestCallback<nitric_proto_batch_v1_batch_pb.JobSubmitResponse>): grpc.ClientUnaryCall;
submitJob(argument: nitric_proto_batch_v1_batch_pb.JobSubmitRequest, metadataOrOptions: grpc.Metadata | grpc.CallOptions | null, callback: grpc.requestCallback<nitric_proto_batch_v1_batch_pb.JobSubmitResponse>): grpc.ClientUnaryCall;
submitJob(argument: nitric_proto_batch_v1_batch_pb.JobSubmitRequest, metadata: grpc.Metadata | null, options: grpc.CallOptions | null, callback: grpc.requestCallback<nitric_proto_batch_v1_batch_pb.JobSubmitResponse>): grpc.ClientUnaryCall;
}
84 changes: 84 additions & 0 deletions src/gen/nitric/proto/batch/v1/batch_grpc_pb.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// GENERATED CODE -- DO NOT EDIT!

'use strict';
var grpc = require('@grpc/grpc-js');
var nitric_proto_batch_v1_batch_pb = require('../../../../nitric/proto/batch/v1/batch_pb.js');
var google_protobuf_struct_pb = require('google-protobuf/google/protobuf/struct_pb.js');

function serialize_nitric_proto_batch_v1_ClientMessage(arg) {
if (!(arg instanceof nitric_proto_batch_v1_batch_pb.ClientMessage)) {
throw new Error('Expected argument of type nitric.proto.batch.v1.ClientMessage');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_nitric_proto_batch_v1_ClientMessage(buffer_arg) {
return nitric_proto_batch_v1_batch_pb.ClientMessage.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_nitric_proto_batch_v1_JobSubmitRequest(arg) {
if (!(arg instanceof nitric_proto_batch_v1_batch_pb.JobSubmitRequest)) {
throw new Error('Expected argument of type nitric.proto.batch.v1.JobSubmitRequest');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_nitric_proto_batch_v1_JobSubmitRequest(buffer_arg) {
return nitric_proto_batch_v1_batch_pb.JobSubmitRequest.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_nitric_proto_batch_v1_JobSubmitResponse(arg) {
if (!(arg instanceof nitric_proto_batch_v1_batch_pb.JobSubmitResponse)) {
throw new Error('Expected argument of type nitric.proto.batch.v1.JobSubmitResponse');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_nitric_proto_batch_v1_JobSubmitResponse(buffer_arg) {
return nitric_proto_batch_v1_batch_pb.JobSubmitResponse.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_nitric_proto_batch_v1_ServerMessage(arg) {
if (!(arg instanceof nitric_proto_batch_v1_batch_pb.ServerMessage)) {
throw new Error('Expected argument of type nitric.proto.batch.v1.ServerMessage');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_nitric_proto_batch_v1_ServerMessage(buffer_arg) {
return nitric_proto_batch_v1_batch_pb.ServerMessage.deserializeBinary(new Uint8Array(buffer_arg));
}


// Service for processing jobs
var JobService = exports.JobService = {
handleJob: {
path: '/nitric.proto.batch.v1.Job/HandleJob',
requestStream: true,
responseStream: true,
requestType: nitric_proto_batch_v1_batch_pb.ClientMessage,
responseType: nitric_proto_batch_v1_batch_pb.ServerMessage,
requestSerialize: serialize_nitric_proto_batch_v1_ClientMessage,
requestDeserialize: deserialize_nitric_proto_batch_v1_ClientMessage,
responseSerialize: serialize_nitric_proto_batch_v1_ServerMessage,
responseDeserialize: deserialize_nitric_proto_batch_v1_ServerMessage,
},
};

exports.JobClient = grpc.makeGenericClientConstructor(JobService);
// Service for submitting jobs to be processed
var BatchService = exports.BatchService = {
submitJob: {
path: '/nitric.proto.batch.v1.Batch/SubmitJob',
requestStream: false,
responseStream: false,
requestType: nitric_proto_batch_v1_batch_pb.JobSubmitRequest,
responseType: nitric_proto_batch_v1_batch_pb.JobSubmitResponse,
requestSerialize: serialize_nitric_proto_batch_v1_JobSubmitRequest,
requestDeserialize: deserialize_nitric_proto_batch_v1_JobSubmitRequest,
responseSerialize: serialize_nitric_proto_batch_v1_JobSubmitResponse,
responseDeserialize: deserialize_nitric_proto_batch_v1_JobSubmitResponse,
},
};

exports.BatchClient = grpc.makeGenericClientConstructor(BatchService);
Loading
Loading