Skip to content

Commit

Permalink
wut did I even do in this commit?
Browse files Browse the repository at this point in the history
  • Loading branch information
rin-yato committed Sep 23, 2024
1 parent 9128f46 commit e871ada
Show file tree
Hide file tree
Showing 15 changed files with 280 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ node_modules
.env.development.local
.env.test.local
.env.production.local
.env.compose

# Testing
coverage
Expand Down
3 changes: 2 additions & 1 deletion apps/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { TransactionTasker } from "@/task/transaction";
import { registerGlobalErrorHandler } from "./setup/error";
import { registerLogger } from "./setup/logger";
import { registerHeaderMiddleware } from "./setup/header";
import { WebhookTasker } from "./task/webhook";

const app = new OpenAPIHono<AppEnv>();

Expand All @@ -30,7 +31,7 @@ registerTiming(app);
registerGlobalErrorHandler(app);

// Register taskers
registerTasker(app, [new TransactionTasker()]);
registerTasker(app, [new TransactionTasker(), new WebhookTasker()]);

// Register Auth middleware
registerAuthMiddleware(app);
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/lib/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { apiError } from "./error";
// Auto run migration
await migrate();

const client = createDBClient({ url: env.DB_URL });
const client = createDBClient({ url: env.DB_URL, max: 30 });
export const db = createDB(client);

export type DB = typeof db;
Expand Down
1 change: 0 additions & 1 deletion apps/api/src/lib/lucia/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export const lucia = new Lucia(adapter, {
address: attributes.address,
phone: attributes.phone,
webhookUrl: attributes.webhookUrl,
allowRetry: attributes.allowRetry,
waitBeforeRedirect: attributes.waitBeforeRedirect,
createdAt: attributes.createdAt,
updatedAt: attributes.createdAt,
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/service/bakong.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class BakongService {
private token = env.BAKONG_TOKEN;
private api = ky.extend({
prefixUrl: BAKONG_API_URL,
retry: 2,
});

createKHQR(opts: KHQROpts) {
Expand Down
29 changes: 25 additions & 4 deletions apps/api/src/service/checkout.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export class CheckoutService {
displayName: true,
username: true,
bakongId: true,
webhookUrl: true,
},
},
},
Expand All @@ -125,7 +126,12 @@ export class CheckoutService {
}

if (checkout.status === "SUCCESS") {
return ok({ checkout, activeTransaction: null });
const {
user: { webhookUrl, ...user },
transactions,
...checkoutOnly
} = checkout;
return ok({ checkout: checkoutOnly, user, activeTransaction: null });
}

let activeTransaction = checkout.transactions.find((t) => t.status === "PENDING");
Expand Down Expand Up @@ -153,9 +159,24 @@ export class CheckoutService {
}

// Add transaction to queue without waiting
withRetry(() => transactionQueue.add(activeTransaction.id, activeTransaction.md5));

return ok({ checkout, activeTransaction });
withRetry(() =>
transactionQueue.add({
userId: checkout.userId,
checkoutId: checkout.id,
webhookUrl: checkout.user.webhookUrl,
transactionId: activeTransaction.id,
md5: activeTransaction.md5,
}),
);

// Ugly i know, but bare with me
const {
user: { webhookUrl, ...user },
transactions,
...checkoutOnly
} = checkout;

return ok({ checkout: checkoutOnly, user, activeTransaction });
}

findById(id: string) {
Expand Down
4 changes: 2 additions & 2 deletions apps/api/src/service/token.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { nanoid } from "@/lib/id";
import { redis } from "@/lib/redis";
import { withRetry } from "@/lib/retry";
import { omit } from "@/lib/transform";
import { err, ok, type Result } from "@justmiracle/result";
import { err, ok, unwrap, type Result } from "@justmiracle/result";
import { TB_token } from "@repo/db/table";
import { and, eq, isNull, sql } from "drizzle-orm";

Expand Down Expand Up @@ -42,7 +42,7 @@ class TokenService {
.catch(err);
if (dbToken.error) return dbToken;

withRetry(() => this.setToken(key, userId));
withRetry(() => this.setToken(key, userId).then(unwrap));

return dbToken;
}
Expand Down
9 changes: 8 additions & 1 deletion apps/api/src/service/webhook.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import { db, takeFirstOrThrow } from "@/lib/db";
import { err, ok } from "@justmiracle/result";
import type { WebhookInsert } from "@repo/db/schema";
import { TB_webhook } from "@repo/db/table";

class WebhookService {
async create(data: WebhookInsert) {
return db.insert(TB_webhook).values(data).returning().then(takeFirstOrThrow);
return db
.insert(TB_webhook)
.values(data)
.returning()
.then(takeFirstOrThrow)
.then(ok)
.catch(err);
}
}

Expand Down
35 changes: 20 additions & 15 deletions apps/api/src/task/transaction/queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DEFAULT_CONNECTION } from "@/lib/tasker";
import { Queue } from "bullmq";
import { type Job, Queue } from "bullmq";

export const TRANSACTION_QUEUE_NAME = "{transaction}";

Expand All @@ -9,28 +9,33 @@ const REMOVE_ON_FAIL = { count: 8500 };
const REMOVE_ON_SUCCESS = { count: 5500 };
const BACKOFF = { type: "fixed", delay: 3200 };

export type TransactionJobData = {
md5: string;
transactionId: string;
checkoutId: string;
userId: string;
webhookUrl: string;
};
export type TransactionJob = Job<TransactionJobData>;

export class TransactionQueue {
queue;

constructor() {
this.queue = new Queue(TRANSACTION_QUEUE_NAME, {
this.queue = new Queue<TransactionJobData>(TRANSACTION_QUEUE_NAME, {
connection: DEFAULT_CONNECTION,
});
}

async add(transactionId: string, md5: string) {
return await this.queue.add(
TRANSACTION_QUEUE_NAME,
{ md5, transactionId },
{
jobId: transactionId,
delay: DELAY,
backoff: BACKOFF,
attempts: MAXIMUM_ATTEMPTS,
removeOnFail: REMOVE_ON_FAIL,
removeOnComplete: REMOVE_ON_SUCCESS,
},
);
async add(data: TransactionJobData) {
return await this.queue.add(TRANSACTION_QUEUE_NAME, data, {
jobId: data.transactionId,
delay: DELAY,
backoff: BACKOFF,
attempts: MAXIMUM_ATTEMPTS,
removeOnFail: REMOVE_ON_FAIL,
removeOnComplete: REMOVE_ON_SUCCESS,
});
}
}

Expand Down
18 changes: 14 additions & 4 deletions apps/api/src/task/transaction/tasker.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { DEFAULT_CONNECTION, type Tasker } from "@/lib/tasker";
import { type Job, UnrecoverableError, Worker } from "bullmq";
import { TRANSACTION_QUEUE_NAME } from "./queue";
import { UnrecoverableError, Worker } from "bullmq";
import { TRANSACTION_QUEUE_NAME, type TransactionJob, type TransactionJobData } from "./queue";
import { bakongService } from "@/service/bakong.service";
import { transactionServcie } from "@/service/transaction.service";
import { logger } from "@/setup/logger";
import { withRetry } from "@/lib/retry";
import { webhookQueue } from "../webhook";

export class TransactionTasker implements Tasker {
worker;

constructor() {
this.worker = new Worker(TRANSACTION_QUEUE_NAME, this.process, {
this.worker = new Worker<TransactionJobData>(TRANSACTION_QUEUE_NAME, this.process, {
autorun: false,
connection: DEFAULT_CONNECTION,
concurrency: 20,
Expand All @@ -32,7 +34,7 @@ export class TransactionTasker implements Tasker {
await this.worker.close();
}

async process(job: Job<{ md5: string; transactionId: string }>) {
async process(job: TransactionJob) {
// if the job age exceeds 5mn, it will be considered failed
const TIMEOUT = 1000 * 60 * 5;
if (job.timestamp + TIMEOUT < Date.now()) {
Expand Down Expand Up @@ -92,6 +94,14 @@ export class TransactionTasker implements Tasker {
throw result.error;
}

withRetry(async () => {
await webhookQueue.add({
userId: job.data.userId,
checkoutId: job.data.checkoutId,
webhookUrl: job.data.webhookUrl,
});
});

await job.updateProgress(100);
job.log("Transaction is successful");
}
Expand Down
33 changes: 20 additions & 13 deletions apps/api/src/task/webhook/queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { withRetry } from "@/lib/retry";
import { DEFAULT_CONNECTION } from "@/lib/tasker";
import type { Webhook } from "@repo/db/schema";
import { err, ok } from "@justmiracle/result";
import { type Job, Queue } from "bullmq";

export const WEBHOOK_QUEUE_NAME = "{webhook}";
Expand All @@ -9,35 +10,41 @@ const REMOVE_ON_SUCCESS = { count: 5500 };
const BACKOFF = { type: "exponential", delay: 500 };
const ATTMEPTS = 5;

export type WebhookJob = Job<Webhook>;
export type WebhookJobData = { checkoutId: string; webhookUrl: string; userId: string };

export type WebhookJob = Job<WebhookJobData>;

class WebhookQueue {
queue;

constructor() {
this.queue = new Queue(WEBHOOK_QUEUE_NAME, {
this.queue = new Queue<WebhookJobData>(WEBHOOK_QUEUE_NAME, {
connection: DEFAULT_CONNECTION,
});
}

async add() {
const job = await this.queue.getJob("h");
async add(opts: WebhookJobData) {
const job = await this.queue.getJob(opts.checkoutId);

if (job) {
await job.retry("failed");
return;
const isFailed = await job.isFailed().then(ok).catch(err);
if (isFailed.error) return isFailed;

withRetry(() => job.retry("failed"));

return ok(job);
}

return await this.queue.add(
WEBHOOK_QUEUE_NAME,
{},
{
return this.queue
.add(WEBHOOK_QUEUE_NAME, opts, {
jobId: opts.checkoutId,
attempts: ATTMEPTS,
backoff: BACKOFF,
removeOnFail: REMOVE_ON_FAIL,
removeOnComplete: REMOVE_ON_SUCCESS,
},
);
})
.then(ok)
.catch(err);
}
}

Expand Down
60 changes: 56 additions & 4 deletions apps/api/src/task/webhook/tasker.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { DEFAULT_CONNECTION, type Tasker } from "@/lib/tasker";
import { WEBHOOK_QUEUE_NAME } from "./queue";
import { Worker, type Job } from "bullmq";
import { WEBHOOK_QUEUE_NAME, type WebhookJob, type WebhookJobData } from "./queue";
import { Worker } from "bullmq";
import ky from "ky";
import { err, ok, unwrap } from "@justmiracle/result";
import { logger } from "@/setup/logger";
import { webhookService } from "@/service/webhook.service";
import { withRetry } from "@/lib/retry";

export class WebhookTasker implements Tasker {
worker;

constructor() {
this.worker = new Worker(WEBHOOK_QUEUE_NAME, this.process, {
this.worker = new Worker<WebhookJobData>(WEBHOOK_QUEUE_NAME, this.process, {
autorun: false,
connection: DEFAULT_CONNECTION,
concurrency: 20,
Expand All @@ -29,5 +34,52 @@ export class WebhookTasker implements Tasker {
await this.worker.close();
}

async process(job: Job) {}
async process(job: WebhookJob) {
const response = await ky
.post(job.data.webhookUrl, { retry: 0, redirect: "error" })
.then(ok)
.catch(err);

if (response.error) {
job.log(`Failed to send webhook: ${response.error.message}`);
logger.error(response.error, "Failed to send webhook");
webhookService.create({
status: 500,
userId: job.data.userId,
checkoutId: job.data.checkoutId,
json: { error: response.error.message },
});
throw response.error;
}

const json = await response.value.json<Record<string, unknown>>().catch(() => ({
error: "INVALID_JSON",
}));

if (!response.value.ok) {
job.log(`Webhook return with http error ${response.value.status}`);
logger.error({ response: response.value, json }, "Failed to send webhook http error");
webhookService.create({
json,
status: response.value.status,
userId: job.data.userId,
checkoutId: job.data.checkoutId,
});
throw new Error(`Webhook return with http error ${response.value.status}`);
}

job.log("Webhook sent successfully");
logger.info({ response: response.value, json }, "Webhook sent successfully");
withRetry(async () => {
await webhookService
.create({
json,
status: response.value.status,
userId: job.data.userId,
checkoutId: job.data.checkoutId,
})
.then(unwrap);
});
job.updateProgress(100);
}
}
Loading

0 comments on commit e871ada

Please sign in to comment.