From 5a476e11908eb1c8edf1914c3101d232428a1252 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sat, 15 Feb 2025 22:41:16 +0800 Subject: [PATCH 1/2] feat: add async throtter --- .../src/utils/getUserTweetsByName.ts | 5 ++- .../src/utils/useAsyncThrotter.ts | 40 +++++++++++++++++++ .../app-twitter-monitor/2025-02-15-14-40.json | 10 +++++ 3 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 apps/twitter-monitor/src/utils/useAsyncThrotter.ts create mode 100644 common/changes/@yuants/app-twitter-monitor/2025-02-15-14-40.json diff --git a/apps/twitter-monitor/src/utils/getUserTweetsByName.ts b/apps/twitter-monitor/src/utils/getUserTweetsByName.ts index 6ecd01dd..37461747 100644 --- a/apps/twitter-monitor/src/utils/getUserTweetsByName.ts +++ b/apps/twitter-monitor/src/utils/getUserTweetsByName.ts @@ -1,5 +1,6 @@ import { ApifyClient } from 'apify-client'; import { ITwitter } from '../types/ITwitter'; +import { useAsyncThrottler } from './useAsyncThrotter'; const GET_TWITTER_METHOD_ID = `61RPP7dywgiy0JPD0`; const token = process.env.APIFY_TOKEN; @@ -9,7 +10,7 @@ const client = new ApifyClient({ }); //通过推特的用户名字 获取推特的信息 -export const getUserTweetsByName = async (name: string): Promise => { +export const getUserTweetsByName = useAsyncThrottler(async (name: string): Promise => { const input = { author: name, customMapFunction: (object: any) => { @@ -30,4 +31,4 @@ export const getUserTweetsByName = async (name: string): Promise => // Fetch and print Actor results from the run's dataset (if any) const { items } = await client.dataset(run.defaultDatasetId).listItems(); return items as any; -}; +}, 30); diff --git a/apps/twitter-monitor/src/utils/useAsyncThrotter.ts b/apps/twitter-monitor/src/utils/useAsyncThrotter.ts new file mode 100644 index 00000000..db71e3e4 --- /dev/null +++ b/apps/twitter-monitor/src/utils/useAsyncThrotter.ts @@ -0,0 +1,40 @@ +export const useAsyncThrottler = ( + asyncFunction: (params: T) => Promise, + concurrencyLimit: number, +) => { + const queue: Array = []; // 任务队列 + let activeTasks = 0; // 当前正在执行的任务数量 + const _next = () => { + if (queue.length === 0 || activeTasks >= concurrencyLimit) { + return; // 队列为空或达到并发限制,直接返回 + } + // 从队列中取出任务并执行 + const task = queue.shift(); + activeTasks++; + task?.(); + }; + return (innerParams: T): Promise => { + return new Promise((resolve, reject) => { + // 将任务包装成一个可执行的函数 + const task = async () => { + try { + const result = await asyncFunction(innerParams); // 调用异步函数 + resolve(result); // 任务成功,返回结果 + } catch (error) { + reject(error); // 任务失败,返回错误 + } finally { + activeTasks--; // 任务完成,减少活跃任务数 + _next(); // 执行下一个任务 + } + }; + + // 将任务加入队列 + queue.push(task); + + // 如果当前活跃任务数未达到限制,立即执行任务 + if (activeTasks < concurrencyLimit) { + _next(); + } + }); + }; +}; diff --git a/common/changes/@yuants/app-twitter-monitor/2025-02-15-14-40.json b/common/changes/@yuants/app-twitter-monitor/2025-02-15-14-40.json new file mode 100644 index 00000000..4a47f9cb --- /dev/null +++ b/common/changes/@yuants/app-twitter-monitor/2025-02-15-14-40.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/app-twitter-monitor", + "comment": "support async throtter", + "type": "patch" + } + ], + "packageName": "@yuants/app-twitter-monitor" +} \ No newline at end of file From 5c1a5a2bd72603b129e6b0c74535b51b468bdf22 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sat, 15 Feb 2025 22:57:39 +0800 Subject: [PATCH 2/2] feat: add async throtter --- apps/twitter-monitor/src/index.ts | 2 ++ apps/twitter-monitor/src/types/ITwitter.ts | 1 + 2 files changed, 3 insertions(+) diff --git a/apps/twitter-monitor/src/index.ts b/apps/twitter-monitor/src/index.ts index b52e4c0a..628d3c61 100644 --- a/apps/twitter-monitor/src/index.ts +++ b/apps/twitter-monitor/src/index.ts @@ -48,7 +48,9 @@ twitterMonitorUsers$ concatMap((tweets) => from(tweets).pipe( // Process each tweet + filter((tweet) => !tweet?.noResults), tap((tweet) => { + console.log(formatTime(Date.now()), 'tap tweet', user.user_id, tweet); const event: ITwitterEvent = { id: tweet.id, content: tweet.text, diff --git a/apps/twitter-monitor/src/types/ITwitter.ts b/apps/twitter-monitor/src/types/ITwitter.ts index fa6f7c3c..b35c0144 100644 --- a/apps/twitter-monitor/src/types/ITwitter.ts +++ b/apps/twitter-monitor/src/types/ITwitter.ts @@ -29,6 +29,7 @@ export interface ITwitter { media: any[]; isConversationControlled: boolean; possiblySensitive?: boolean; + noResults?: boolean; } export interface Author {