Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add async throtter #1065

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/twitter-monitor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ twitterMonitorUsers$
concatMap((tweets) =>
from(tweets).pipe(
// Process each tweet
filter((tweet) => !tweet?.noResults),
tap((tweet) => {
console.log(formatTime(Date.now()), 'tap tweet', user.user_id, tweet);
const event: ITwitterEvent = {
id: tweet.id,
content: tweet.text,
Expand Down
1 change: 1 addition & 0 deletions apps/twitter-monitor/src/types/ITwitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export interface ITwitter {
media: any[];
isConversationControlled: boolean;
possiblySensitive?: boolean;
noResults?: boolean;
}

export interface Author {
Expand Down
5 changes: 3 additions & 2 deletions apps/twitter-monitor/src/utils/getUserTweetsByName.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ApifyClient } from 'apify-client';
import { ITwitter } from '../types/ITwitter';
import { useAsyncThrottler } from './useAsyncThrotter';

const GET_TWITTER_METHOD_ID = `61RPP7dywgiy0JPD0`;
const token = process.env.APIFY_TOKEN;
Expand All @@ -9,7 +10,7 @@ const client = new ApifyClient({
});

//通过推特的用户名字 获取推特的信息
export const getUserTweetsByName = async (name: string): Promise<ITwitter[]> => {
export const getUserTweetsByName = useAsyncThrottler(async (name: string): Promise<ITwitter[]> => {
const input = {
author: name,
customMapFunction: (object: any) => {
Expand All @@ -30,4 +31,4 @@ export const getUserTweetsByName = async (name: string): Promise<ITwitter[]> =>
// Fetch and print Actor results from the run's dataset (if any)
const { items } = await client.dataset(run.defaultDatasetId).listItems();
return items as any;
};
}, 30);
40 changes: 40 additions & 0 deletions apps/twitter-monitor/src/utils/useAsyncThrotter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
export const useAsyncThrottler = <T, K>(
asyncFunction: (params: T) => Promise<K>,
concurrencyLimit: number,
) => {
const queue: Array<Function> = []; // 任务队列
let activeTasks = 0; // 当前正在执行的任务数量
const _next = () => {
if (queue.length === 0 || activeTasks >= concurrencyLimit) {
return; // 队列为空或达到并发限制,直接返回
}
// 从队列中取出任务并执行
const task = queue.shift();
activeTasks++;
task?.();
};
return (innerParams: T): Promise<K> => {
return new Promise((resolve, reject) => {
// 将任务包装成一个可执行的函数
const task = async () => {
try {
const result = await asyncFunction(innerParams); // 调用异步函数
resolve(result); // 任务成功,返回结果
} catch (error) {
reject(error); // 任务失败,返回错误
} finally {
activeTasks--; // 任务完成,减少活跃任务数
_next(); // 执行下一个任务
}
};

// 将任务加入队列
queue.push(task);

// 如果当前活跃任务数未达到限制,立即执行任务
if (activeTasks < concurrencyLimit) {
_next();
}
});
};
};
10 changes: 10 additions & 0 deletions common/changes/@yuants/app-twitter-monitor/2025-02-15-14-40.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/app-twitter-monitor",
"comment": "support async throtter",
"type": "patch"
}
],
"packageName": "@yuants/app-twitter-monitor"
}