-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.js
120 lines (104 loc) · 2.47 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
const uWS = require('uWebSockets.js');
const config = require('./config');
let connID = 0;
let messageID = 0;
let connTracker = new Map();
let subscribers = new Map();
async function publish(message) {
if (!subscribers.has(message.channel)) return;
let msg = JSON.stringify(message);
for (let subscriber of subscribers.get(message.channel)) {
connTracker.get(subscriber).send(msg);
}
}
const subscriber = uWS.App({}).ws('/*', {
open: ws => {
connID++;
ws.id = connID;
ws.sub = new Set();
connTracker.set(ws.id, ws);
},
message: (ws, message, isBinary) => {
if (isBinary) {
ws.close();
return;
}
let request;
try {
request = JSON.parse(Buffer.from(message).toString());
} catch(e) {
ws.send(JSON.stringify({
status: 'error',
code: 'syntax-error',
message: 'Invalid JSON'
}));
return;
}
if (request.command === 'set-filter') {
for (let filter of request.filter) {
if (typeof filter !== 'string') {
ws.send(JSON.stringify({
status: 'error',
code: 'syntax-error',
message: 'Invalid Filter'
}));
break;
}
if (!subscribers.has(filter)) {
subscribers.set(filter, new Set());
}
subscribers.get(filter).add(ws.id);
ws.sub.add(filter);
}
}
},
close: ws => {
for (let filter of ws.sub) {
subscribers.get(filter).delete(ws.id);
if (subscribers.get(filter).size === 0) {
subscribers.delete(filter);
}
}
connTracker.delete(ws.id);
}
}).listen(config.subscriber.host, config.subscriber.port, () => {
console.log(`Started subscriber listener on ${config.subscriber.host}:${config.subscriber.port}`);
});
const emitter = uWS.App().ws('/*', {
message: (ws, message, isBinary) => {
if (isBinary) {
ws.close();
return;
}
let request;
try {
request = JSON.parse(Buffer.from(message).toString());
} catch(e) {
ws.send(JSON.stringify({
status: 'error',
code: 'syntax-error',
message: 'Invalid JSON'
}));
return;
}
if (request.command === 'last-msg') {
ws.send(JSON.stringify({
status: 'success',
id: messageID
}));
} else if (request.command === 'post') {
let msg = {
id: ++messageID,
channel: request.channel,
message: request.message
};
publish(msg);
ws.send(JSON.stringify({
status: 'success',
id: msg.id
}))
}
}
}).listen(config.publisher.host, config.publisher.port, () => {
console.log(`Started publisher listener on ${config.publisher.host}:${config.publisher.port}`);
});