-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebhook_mqtt_wan.cjs
147 lines (113 loc) · 3.9 KB
/
webhook_mqtt_wan.cjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// server.js
const express = require('express');
const mqtt = require('async-mqtt');
const uuid = require('uuid');
const app = express();
const fs = require('fs');
const logger = require('./logger.cjs')
let wh_timeout=5000
// MQTT Broker 连接配置
//const mqttHost = 'mqtt://localhost:3001'; // 根据实际情况修改
const clientId = `express_${uuid.v4()}`;
let mqttClient;
// 连接到 MQTT Broker
const connectMqtt = async (connectUrl) => {
mqttClient = mqtt.connect(connectUrl, {
clientId,
clean: true,
connectTimeout: 4000,
reconnectPeriod: 1000,
});
await mqttClient.subscribe('response_topic');
//await mqttClient.subscribe('request_proxy_topic');
logger.log('Connected to MQTT Broker and subscribed to response_topic');
};
// 存储等待的响应
const pendingResponses = new Map();
const handleResponse = async (topic, message) => {
if (topic === 'response_topic') {
const payload = JSON.parse(message.toString());
const correlationId = payload.correlationId;
if (pendingResponses.has(correlationId)) {
const resolver = pendingResponses.get(correlationId);
resolver(payload.data);
pendingResponses.delete(correlationId);
}
}
};
module.exports.init= (webhookIp,webhookPort,mqttHost,timeout)=>{
wh_timeout = timeout
connectMqtt(mqttHost)
.then(() => {
mqttClient.on('message', handleResponse);
// mqttClient.on('message', handleProxyRequest);
})
.catch(err => {
logger.log('MQTT connection error:', err);
process.exit(1);
});
app.listen(webhookPort,webhookIp, () => {
logger.log(`Express server listening at http://${webhookIp}:${webhookPort}`);
});
}
// 中间件,用于解析 JSON 请求体
app.use(express.json());
// 路由,处理 HTTP 请求
app.all('*', async (req, res) => {
//logger.log(req)
const requestData = {
body:req.body,
query:req.query,
method:req.method,
cookies:req.cookies,
originalUrl:req.originalUrl,
hostname:req.hostname,
headers:req.headers
}
if(!req.headers.subtopic){
req.headers.subtopic=''
}
//logger.log(req.headers)
const correlationId = uuid.v4();
// 创建 Promise 并存储 resolver
const responsePromise = new Promise((resolve, reject) => {
pendingResponses.set(correlationId, resolve);
// 设置超时
const timeoutId = setTimeout(() => {
if (pendingResponses.has(correlationId)) {
pendingResponses.delete(correlationId);
reject(new Error('Processing timeout'));
}
}, wh_timeout);
});
// 发布消息到局域网服务器订阅的主题
try {
await mqttClient.publish('request_topic'+req.headers.subtopic, JSON.stringify({
correlationId,
data:requestData
}), { qos: 1 });
} catch (err) {
logger.log('Error publishing message:', err);
res.status(500).json({ error: 'Internal Server Error' });
return;
}
// 等待响应
try {
const data = await responsePromise;
if(data.res_type==1&&(!isNaN(data.status)))
res.status(data.status).send(data.content);
else if(data.res_type==2){
//send file
const buffer = Buffer.from(data.content);
res.status(data.status).send(buffer);
}else if(data.res_type==3){
const buffer = Buffer.from(data.file);
fs.writeFileSync(data.dir,buffer)
res.status(data.status).send(data.content);
}else{
res.send(201);
}
} catch (err) {
res.status(500).json({ error: err.message });
}
});