Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle campaigns with a large number of emails #64

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ JWT_SECRET=mysupersecretJWTsecret
REDIS_URL=redis://127.0.0.1:56379
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
DISABLE_SIGNUPS=false
RATE_LIMIT=100

# AWS
AWS_REGION=
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/app/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export function validateEnv<T extends string = string>(key: keyof NodeJS.Process
// ENV
export const JWT_SECRET = validateEnv("JWT_SECRET");
export const NODE_ENV = validateEnv<"development" | "production">("NODE_ENV", "production");

export const RATE_LIMIT = validateEnv("RATE_LIMIT", "100");
export const REDIS_URL = validateEnv("REDIS_URL");
export const DISABLE_SIGNUPS = validateEnv("DISABLE_SIGNUPS", "false").toLowerCase() === "true";

Expand Down
192 changes: 107 additions & 85 deletions packages/api/src/controllers/Tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,119 +5,141 @@ import { prisma } from "../database/prisma";
import { ContactService } from "../services/ContactService";
import { EmailService } from "../services/EmailService";
import { ProjectService } from "../services/ProjectService";
import { redis, REDIS_ONE_MINUTE } from "../services/redis";

@Controller("tasks")
export class Tasks {
@Post()
public async handleTasks(req: Request, res: Response) {
// Get all tasks with a runBy data in the past
signale.info("Checking for tasks to run");
const tasks = await prisma.task.findMany({
where: { runBy: { lte: new Date() } },
orderBy: { runBy: "asc" },
include: {
action: { include: { template: true, notevents: true } },
campaign: true,
contact: true,
},
});

signale.info(`Found ${tasks.length} tasks to run`);

for (const task of tasks) {
const { action, campaign, contact } = task;
const lockKey = `task_lock:${task.id}`;
const lock = await redis.set(lockKey, 'locked', 'EX', REDIS_ONE_MINUTE * 60, 'NX');
if (!lock) {
continue; // Skip this task if it's already being processed
}

const project = await ProjectService.id(contact.projectId);
const fullTask = await prisma.task.findUnique({
where: { id: task.id },
include: {
action: { include: { template: true, notevents: true } },
campaign: true,
contact: true,
},
});

// If the project does not exist or is disabled, delete all tasks
if (!project) {
await prisma.task.deleteMany({
where: {
contact: {
projectId: contact.projectId,
if (fullTask) {
const { action, campaign, contact } = fullTask;

const project = await ProjectService.id(contact.projectId);

// If the project does not exist or is disabled, delete all tasks
if (!project) {
await prisma.task.deleteMany({
where: {
contact: {
projectId: contact.projectId,
},
},
},
});
continue;
}
});
continue;
}

let subject = "";
let body = "";
let subject = "";
let body = "";

if (action) {
const { template, notevents } = action;
if (action) {
const { template, notevents } = action;

if (notevents.length > 0) {
const triggers = await ContactService.triggers(contact.id);
if (notevents.some((e) => triggers.some((t) => t.contactId === contact.id && t.eventId === e.id))) {
await prisma.task.delete({ where: { id: task.id } });
continue;
if (notevents.length > 0) {
const triggers = await ContactService.triggers(contact.id);
if (notevents.some((e) => triggers.some((t) => t.contactId === contact.id && t.eventId === e.id))) {
await prisma.task.delete({ where: { id: task.id } });
continue;
}
}

({ subject, body } = EmailService.format({
subject: template.subject,
body: template.body,
data: {
plunk_id: contact.id,
plunk_email: contact.email,
...JSON.parse(contact.data ?? "{}"),
},
}));
} else if (campaign) {
({ subject, body } = EmailService.format({
subject: campaign.subject,
body: campaign.body,
data: {
plunk_id: contact.id,
plunk_email: contact.email,
...JSON.parse(contact.data ?? "{}"),
},
}));
}

({ subject, body } = EmailService.format({
subject: template.subject,
body: template.body,
data: {
plunk_id: contact.id,
plunk_email: contact.email,
...JSON.parse(contact.data ?? "{}"),
const { messageId } = await EmailService.send({
from: {
name: project.from ?? project.name,
email: project.verified && project.email ? project.email : "[email protected]",
},
}));
} else if (campaign) {
({ subject, body } = EmailService.format({
subject: campaign.subject,
body: campaign.body,
data: {
plunk_id: contact.id,
plunk_email: contact.email,
...JSON.parse(contact.data ?? "{}"),
to: [contact.email],
content: {
subject,
html: EmailService.compile({
content: body,
footer: {
unsubscribe: campaign ? true : !!action && action.template.type === "MARKETING",
},
contact: {
id: contact.id,
},
project: {
name: project.name,
},
isHtml: (campaign && campaign.style === "HTML") ?? (!!action && action.template.style === "HTML"),
}),
},
}));
}
});

const { messageId } = await EmailService.send({
from: {
name: project.from ?? project.name,
email: project.verified && project.email ? project.email : "[email protected]",
},
to: [contact.email],
content: {
subject,
html: EmailService.compile({
content: body,
footer: {
unsubscribe: campaign ? true : !!action && action.template.type === "MARKETING",
},
contact: {
id: contact.id,
},
project: {
name: project.name,
},
isHtml: (campaign && campaign.style === "HTML") ?? (!!action && action.template.style === "HTML"),
}),
},
});
try {
await prisma.task.delete({ where: { id: task.id } });
} catch (error) {
signale.error(`Failed to delete task for: ${contact.email}`);
}

const emailData: {
messageId: string;
contactId: string;
actionId?: string;
campaignId?: string;
} = {
messageId,
contactId: contact.id,
};

if (action) {
emailData.actionId = action.id;
} else if (campaign) {
emailData.campaignId = campaign.id;
}
const emailData: {
messageId: string;
contactId: string;
actionId?: string;
campaignId?: string;
} = {
messageId,
contactId: contact.id,
};

await prisma.email.create({ data: emailData });
if (action) {
emailData.actionId = action.id;
} else if (campaign) {
emailData.campaignId = campaign.id;
}

await prisma.email.create({ data: emailData });

await prisma.task.delete({ where: { id: task.id } });
await redis.del(lockKey);

signale.success(`Task completed for ${contact.email} from ${project.name}`);
signale.success(`Task completed for ${contact.email} from ${project.name}`);
}
}

return res.status(200).json({ success: true });
Expand Down
30 changes: 24 additions & 6 deletions packages/api/src/controllers/v1/Campaigns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import { MembershipService } from "../../services/MembershipService";
import { ProjectService } from "../../services/ProjectService";
import { Keys } from "../../services/keys";
import { redis } from "../../services/redis";
import { RATE_LIMIT } from '../../app/constants';

const chunkSize = 500;

@Controller("campaigns")
export class Campaigns {
Expand Down Expand Up @@ -89,8 +92,10 @@ export class Campaigns {

let delay = userDelay ?? 0;

const rateLimit = parseInt(RATE_LIMIT, 10) || 400;

const tasks = campaign.recipients.map((r, index) => {
if (index % 80 === 0) {
if (index % rateLimit === 0) {
delay += 1;
}

Expand Down Expand Up @@ -200,7 +205,8 @@ export class Campaigns {
},
});

const chunkSize = 500;
const allRecipientIds = [];

for (let i = 0; i < recipients.length; i += chunkSize) {
const chunk = recipients.slice(i, i + chunkSize);

Expand All @@ -226,15 +232,21 @@ export class Campaigns {
}),
);

allRecipientIds.push(...recipientIds);
}

for (let i = 0; i < allRecipientIds.length; i += chunkSize) {
const chunk = allRecipientIds.slice(i, i + chunkSize);

await prisma.campaign.update({
where: { id: campaign.id },
data: {
recipients: {
connect: recipientIds.map((id) => ({ id })),
connect: chunk.map(id => ({ id })),
},
},
});
}
};

await redis.del(Keys.Campaign.id(campaign.id));
await redis.del(Keys.Project.campaigns(project.id));
Expand Down Expand Up @@ -298,7 +310,7 @@ export class Campaigns {
},
});

const chunkSize = 500;
const allRecipientIds = [];

for (let i = 0; i < campaign.recipients.length; i += chunkSize) {
const chunk = campaign.recipients.slice(i, i + chunkSize);
Expand Down Expand Up @@ -331,11 +343,17 @@ export class Campaigns {
}),
);

allRecipientIds.push(...recipientIds);
}

for (let i = 0; i < allRecipientIds.length; i += chunkSize) {
const chunk = allRecipientIds.slice(i, i + chunkSize);

await prisma.campaign.update({
where: { id: campaign.id },
data: {
recipients: {
connect: recipientIds.map((id) => ({ id })),
connect: chunk.map((id) => ({ id })),
},
},
});
Expand Down
32 changes: 23 additions & 9 deletions packages/api/src/services/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,29 @@ import Redis from 'ioredis';
import {REDIS_URL} from '../app/constants';
import signale from "signale";

let redis: Redis;
try {
redis = new Redis(REDIS_URL);
const infoString = redis.info();
signale.info('Redis initialized: ', infoString);
} catch (error) {
signale.error('Failed to initialize Redis: ', error);
}
export {redis};
export let redis: Redis;
const maxRetries = 5;
const retryDelay = 2000; // 2 seconds

const connectToRedis = async (attempt = 0) => {
try {
redis = new Redis(REDIS_URL);
await redis.ping();
const infoString = await redis.info();
signale.info('Redis initialized: ', infoString);
} catch (error) {
if (attempt < maxRetries) {
signale.warn(`Failed to connect to Redis. Retrying in ${retryDelay}ms...`);
await new Promise(resolve => setTimeout(resolve, retryDelay));
await connectToRedis(attempt + 1);
} else {
signale.error('Failed to initialize Redis after multiple attempts: ', error);
throw error;
}
}
};

connectToRedis();

export const REDIS_ONE_MINUTE = 60;
export const REDIS_DEFAULT_EXPIRY = REDIS_ONE_MINUTE / 60;
Expand Down
Loading