-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
30 changed files
with
3,927 additions
and
1,376 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
node_modules | ||
test.js | ||
test.js | ||
.DS_Store |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
module.exports = { | ||
preset: 'ts-jest', | ||
testEnvironment: 'node', | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
export declare type DelayedConfig = { | ||
unit: 's' | 'sec' | 'second' | 'seconds' | 'm' | 'min' | 'minute' | 'minutes' | 'h' | 'hour' | 'hours' | 'd' | 'day' | 'days'; | ||
value: number; | ||
}; | ||
export default function getDelayed(delayed?: DelayedConfig, from?: Date): string | null; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
"use strict"; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var addSeconds_1 = __importDefault(require("date-fns/addSeconds")); | ||
var addMinutes_1 = __importDefault(require("date-fns/addMinutes")); | ||
var addHours_1 = __importDefault(require("date-fns/addHours")); | ||
var addDays_1 = __importDefault(require("date-fns/addDays")); | ||
function delayObjectToString(delayed, from) { | ||
var add; | ||
switch (delayed.unit) { | ||
case 's': | ||
case 'sec': | ||
case 'second': | ||
case 'seconds': | ||
add = addSeconds_1.default; | ||
break; | ||
case 'm': | ||
case 'min': | ||
case 'minute': | ||
case 'minutes': | ||
add = addMinutes_1.default; | ||
break; | ||
case 'h': | ||
case 'hour': | ||
case 'hours': | ||
add = addHours_1.default; | ||
break; | ||
case 'd': | ||
case 'day': | ||
case 'days': | ||
add = addDays_1.default; | ||
break; | ||
} | ||
return add(from, delayed.value).toISOString(); | ||
} | ||
function getDelayed(delayed, from) { | ||
if (from === void 0) { from = new Date(); } | ||
if (typeof delayed === 'string') { | ||
return delayed; | ||
} | ||
else if (typeof delayed === 'object') { | ||
return delayObjectToString(delayed, from); | ||
} | ||
return null; | ||
} | ||
exports.default = getDelayed; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
export declare type RetryConfig = { | ||
count: number; | ||
delay?: number; | ||
}; | ||
export default function getRetries(retries?: RetryConfig | number): RetryConfig; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
"use strict"; | ||
var __assign = (this && this.__assign) || function () { | ||
__assign = Object.assign || function(t) { | ||
for (var s, i = 1, n = arguments.length; i < n; i++) { | ||
s = arguments[i]; | ||
for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p)) | ||
t[p] = s[p]; | ||
} | ||
return t; | ||
}; | ||
return __assign.apply(this, arguments); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var defaultRetries = { | ||
count: 0, | ||
delay: 1000, | ||
}; | ||
function getRetries(retries) { | ||
if (retries) { | ||
if (typeof retries === 'object' && retries.count) { | ||
return __assign(__assign({}, defaultRetries), retries); | ||
} | ||
else if (typeof retries === 'number') { | ||
return __assign(__assign({}, defaultRetries), { count: retries }); | ||
} | ||
} | ||
return defaultRetries; | ||
} | ||
exports.default = getRetries; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import { ClientConfig, PubSub } from '@google-cloud/pubsub'; | ||
export interface JobPayload { | ||
[key: string]: string; | ||
} | ||
export declare type QueueConfig = { | ||
topicName: string; | ||
subscriptionName: string; | ||
buriedTopicName?: string; | ||
}; | ||
export default class PubsubQueue { | ||
connectionConfig: ClientConfig; | ||
queueConfig: QueueConfig; | ||
_client?: PubSub; | ||
publisher?: any; | ||
worker?: any; | ||
constructor(connectionConfig: ClientConfig | undefined, queueConfig: QueueConfig); | ||
client(): PubSub; | ||
get Publisher(): any; | ||
get Worker(): any; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
"use strict"; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var pubsub_1 = require("@google-cloud/pubsub"); | ||
var publisher_1 = __importDefault(require("./publisher")); | ||
var worker_1 = __importDefault(require("./worker")); | ||
var PubsubQueue = /** @class */ (function () { | ||
function PubsubQueue(connectionConfig, queueConfig) { | ||
if (connectionConfig === void 0) { connectionConfig = {}; } | ||
this.connectionConfig = connectionConfig; | ||
this.queueConfig = queueConfig; | ||
} | ||
PubsubQueue.prototype.client = function () { | ||
if (!this._client) { | ||
this._client = new pubsub_1.PubSub(this.connectionConfig); | ||
} | ||
return this._client; | ||
}; | ||
Object.defineProperty(PubsubQueue.prototype, "Publisher", { | ||
get: function () { | ||
var client = this.client(); | ||
if (!this.publisher) { | ||
this.publisher = new publisher_1.default(client, this.queueConfig.topicName); | ||
} | ||
return this.publisher; | ||
}, | ||
enumerable: false, | ||
configurable: true | ||
}); | ||
Object.defineProperty(PubsubQueue.prototype, "Worker", { | ||
get: function () { | ||
var client = this.client(); | ||
if (!this.worker) { | ||
this.worker = new worker_1.default(client, this.queueConfig); | ||
} | ||
return this.worker; | ||
}, | ||
enumerable: false, | ||
configurable: true | ||
}); | ||
return PubsubQueue; | ||
}()); | ||
exports.default = PubsubQueue; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import { PubSub, Topic } from '@google-cloud/pubsub'; | ||
import type { DelayedConfig } from './getDelayed'; | ||
import type { RetryConfig } from './getRetries'; | ||
import { JobPayload } from '.'; | ||
declare type Job = { | ||
type: string; | ||
payload?: JobPayload; | ||
delayed?: DelayedConfig; | ||
retries?: RetryConfig; | ||
}; | ||
export default class PubsubPublisher { | ||
client: PubSub; | ||
topic: Topic; | ||
constructor(client: PubSub, topicName: string); | ||
/** | ||
* Publishes a job to the queue | ||
*/ | ||
publish(arg1: Job): Promise<string>; | ||
publish(arg1: string, arg2?: Job): Promise<string>; | ||
} | ||
export {}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
"use strict"; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var getDelayed_1 = __importDefault(require("./getDelayed")); | ||
var PubsubPublisher = /** @class */ (function () { | ||
function PubsubPublisher(client, topicName) { | ||
this.client = client; | ||
this.topic = this.client.topic(topicName); | ||
} | ||
PubsubPublisher.prototype.publish = function (arg1, arg2) { | ||
var job; | ||
var topic; | ||
if (typeof arg1 === 'string' && typeof arg2 === 'object') { | ||
job = arg2; | ||
topic = this.client.topic(arg1); | ||
} | ||
else { | ||
job = arg1; | ||
topic = this.topic; | ||
} | ||
var type = job.type, _a = job.payload, payload = _a === void 0 ? {} : _a, _b = job.delayed, delayed = _b === void 0 ? null : _b, _c = job.retries, retries = _c === void 0 ? null : _c; | ||
var attributes = { | ||
type: type, | ||
}; | ||
if (delayed) { | ||
var delay = getDelayed_1.default(delayed); | ||
if (delay) { | ||
attributes.delayed = delay; | ||
} | ||
} | ||
if (retries) { | ||
attributes.retries = String(retries.count); | ||
} | ||
return topic.publish(Buffer.from(JSON.stringify(payload)), attributes); | ||
}; | ||
return PubsubPublisher; | ||
}()); | ||
exports.default = PubsubPublisher; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
export declare function sleep(ms: number): Promise<unknown>; | ||
export declare function safeJSONParse(val: string): any; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.safeJSONParse = exports.sleep = void 0; | ||
function sleep(ms) { | ||
return new Promise(function (resolve) { | ||
setTimeout(resolve, ms); | ||
}); | ||
} | ||
exports.sleep = sleep; | ||
function safeJSONParse(val) { | ||
try { | ||
return JSON.parse(val); | ||
} | ||
catch (err) { | ||
return val; | ||
} | ||
} | ||
exports.safeJSONParse = safeJSONParse; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/// <reference types="node" /> | ||
import { EventEmitter } from 'events'; | ||
import Publisher from './publisher'; | ||
import { DelayedConfig } from './getDelayed'; | ||
import { RetryConfig } from './getRetries'; | ||
import { Message, PubSub, Topic } from '@google-cloud/pubsub'; | ||
import { JobPayload, QueueConfig } from '.'; | ||
declare type JobStatus = string | 'put' | 'retry'; | ||
declare type JobResult = JobStatus | { | ||
status: string; | ||
extra: any; | ||
}; | ||
declare type Handler = { | ||
ackOnStart?: boolean; | ||
retries?: RetryConfig; | ||
delayed?: DelayedConfig; | ||
work: (payload: JobPayload, message: Message) => Promise<JobResult>; | ||
}; | ||
declare type DynamicHandler = (type: string) => Handler; | ||
declare type Handlers = Record<string, Handler> | DynamicHandler; | ||
export default class PubsubWorker extends EventEmitter { | ||
client: PubSub; | ||
queueConfig: QueueConfig; | ||
topic: Topic; | ||
publisher: Publisher; | ||
buriedPublisher?: Publisher; | ||
constructor(client: PubSub, queueConfig: QueueConfig); | ||
work(handlers: Handlers, message: Message): Promise<void>; | ||
runHandler(handler: Handler, data: JobPayload, message: Message, ackOnStart: boolean | undefined, delayed: string | null): Promise<{}>; | ||
handleRetry(status: JobStatus, message: Message, ackOnStart: boolean, delayed: string | null): void; | ||
/** | ||
* Start the worker | ||
*/ | ||
start(handlers?: {}, options?: {}): void; | ||
} | ||
export {}; |
Oops, something went wrong.