This repository has been archived by the owner on Feb 22, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.js
404 lines (320 loc) · 11.2 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
/*
* eve-live Application
*/
// Load global configuration
var config = require('./config');
// Load deps
var express = require('express'),
routes = require('./routes'),
http = require('http'),
path = require('path'),
zmq = require('zmq'),
zlib = require('zlib'),
colors = require('colors'),
zmqSocket = zmq.socket('sub');
var app = express();
// Main DB
var prices = {};
// Stats
var messagesTotal = 0;
var messagesOrders = 0;
var dataUpdated = 0;
// Block STDOUT?
var outBlocked = false;
/*
* Express Setup
*/
app.configure(function() {
app.set('port', process.env.PORT || config.port);
app.set('views', __dirname + '/views');
app.set('view engine', 'jade');
app.use(express.compress());
app.use(express.favicon());
app.use(express.logger(config.expressLoggingLevel));
app.use(express.bodyParser());
app.use(express.methodOverride());
app.use(app.router);
app.use(express.static(path.join(__dirname, 'public')));
});
app.configure('development', function() {
app.use(express.errorHandler());
});
app.get('/', routes.index);
// Return market snapshot of this system
app.get('/snapshot/:system/', function(req, res) {
var response = JSON.stringify(prices[req.params.system]);
if(response === undefined) {
response = '{error: "No data for this system. Is the ID correct?"}';
}
res.write(response);
res.end();
});
// Return market snapshot of that type in this system
app.get('/snapshot/:system/:type/', function(req, res) {
var response = "";
// Error handling
try {
response = JSON.stringify(prices[req.params.system][req.params.type]);
} catch (err) {
response = '{error: "No data for that system. Is the systemID correct?"}';
}
if(response === undefined) {
response = '{error: "No data for that type in this system. Is the typeID correct?"}';
}
res.write(response);
res.end();
});
/*
* WebSockets
*/
var server = app.listen(app.get('port'));
var io = require('socket.io').listen(server);
io.set('log level', 1);
// XHR fallback for hosts like Heroku
if(config.xhrFallback) {
io.set("transports", ["xhr-polling"]);
io.set("polling duration", 10);
}
io.sockets.on('connection', function(socket) {
// On connection emit current DB of that system
socket.on('subscribe', function(data) {
socket.emit('init', prices[String(data.room).replace('-realtime', '')]);
socket.join(data.room);
});
// Remove from room when client unsubscribed
socket.on('unsubscribe', function(data) {
socket.leave(data.room);
});
});
/*
* eve-live server
*/
console.log(' /$$ /$$ ');
console.log(' | $$|__/ ');
console.log(' /$$$$$$ /$$ /$$ /$$$$$$ | $$ /$$ /$$ /$$ /$$$$$$ ');
console.log(' /$$__ $$| $$ /$$//$$__ $$ /$$$$$$| $$| $$| $$ /$$//$$__ $$');
console.log('| $$$$$$$$ \\ $$/$$/| $$$$$$$$|______/| $$| $$ \\ $$/$$/| $$$$$$$$');
console.log('| $$_____/ \\ $$$/ | $$_____/ | $$| $$ \\ $$$/ | $$_____/');
console.log('| $$$$$$$ \\ $/ | $$$$$$$ | $$| $$ \\ $/ | $$$$$$$');
console.log(' \\_______/ \\_/ \\_______/ |__/|__/ \\_/ \\_______/ v1.0');
// Connect to the relays specified in the config file
for(var relay in config.relays) {
process.stdout.write('Connecting to ' + config.relays[relay].underline + ':');
// Connect to the relay.
zmqSocket.connect(config.relays[relay]);
console.log(' OK!'.green);
}
// External storage functionality
if(config.externalStorage) {
redis = require("redis").createClient(config.redis.port, config.redis.host, {return_buffers: config.externalStorageCompression});
redis.on("error", function(err) {
console.log("Redis error: " + err);
});
}
function redisAuthenticate() {
// Authenticate at server
process.stdout.write('Authenticating at Redis server: ');
redis.auth(config.redis.password, redisSelectDatabase());
}
function redisSelectDatabase() {
// If client had to authenticate and the app has not crashed everything must have went fine
if(config.redis.authenticate) {
console.log('OK!'.green);
}
// Select proper DB
process.stdout.write('Selecting Redis database: ');
redis.select(config.redis.database, redisLoadDatabase());
}
function redisLoadDatabase() {
// Load database from Redis
console.log('OK!'.green);
process.stdout.write('Loading database from Redis: ');
// Decide which key to load depending on settings
if(config.externalStorageCompression) {
redis.get('eve-live-db-compressed', function(err, reply) {
if(reply === null) {
console.log('Key is missing!'.yellow);
} else {
zlib.inflate(reply, function(err, inflatedData){
prices = JSON.parse(inflatedData);
console.log('OK!'.green);
});
}
outBlocked = false;
});
} else {
redis.get('eve-live-db', function(err, reply) {
if(reply === null) {
console.log('Key is missing!'.yellow);
} else {
prices = JSON.parse(reply);
console.log('OK!'.green);
}
outBlocked = false;
});
}
}
// Kick-off external storage initialization
if(config.externalStorage) {
outBlocked = true;
if(config.redis.authenticate) {
redisAuthenticate();
} else {
redisSelectDatabase();
}
}
// Disable filtering
zmqSocket.subscribe('');
// Message Handling
zmqSocket.on('error', function(error) {
console.log('ERROR: ' + error);
});
// ZeroMQ Socket event handling
zmqSocket.on('message', function(message) {
// Receive raw market JSON strings.
zlib.inflate(message, function(error, marketJSON) {
// Parse the JSON data.
var marketData = JSON.parse(marketJSON);
// Increase message counter
messagesTotal++;
// If we got orders parse prices
if(marketData.resultType == 'orders') {
// Increase message counter
messagesOrders++;
var priceIndex = marketData.columns.indexOf('price');
var solarSystemIndex = marketData.columns.indexOf('solarSystemID');
var isBidIndex = marketData.columns.indexOf('bid');
// Fill value array if there are any rows
for(var rowset in marketData.rowsets) {
var bid = 0;
var ask = 0;
var oldDate = 0;
var typeID = marketData.rowsets[rowset].typeID;
var generatedAt = Date.parse(marketData.rowsets[rowset].generatedAt);
var solarSystemsAffected = {};
// Traverse through data structure
for(var row in marketData.rowsets[rowset].rows) {
var price = marketData.rowsets[rowset].rows[row][priceIndex];
var isBid = marketData.rowsets[rowset].rows[row][isBidIndex];
var solarSystemID = marketData.rowsets[rowset].rows[row][solarSystemIndex];
bid = 0;
ask = 0;
oldDate = 0;
// Try reading those values or initialize data structure
if(!prices[solarSystemID]) {
prices[solarSystemID] = {};
prices[solarSystemID][typeID] = {};
prices[solarSystemID][typeID].ask = ask;
prices[solarSystemID][typeID].bid = bid;
prices[solarSystemID][typeID].generatedAt = oldDate;
}
if(!prices[solarSystemID][typeID]) {
prices[solarSystemID][typeID] = {};
prices[solarSystemID][typeID].ask = ask;
prices[solarSystemID][typeID].bid = bid;
prices[solarSystemID][typeID].generatedAt = oldDate;
}
bid = prices[solarSystemID][typeID].bid;
ask = prices[solarSystemID][typeID].ask;
oldDate = prices[solarSystemID][typeID].generatedAt;
// Check if there's new data
// On first run reset prices
if(oldDate < generatedAt) {
dataUpdated++;
// Determine if we have to update the values
oldBid = prices[solarSystemID][typeID].bid;
oldAsk = prices[solarSystemID][typeID].ask;
// Add system to list of updated systems
if(Object.keys(solarSystemsAffected).indexOf(solarSystemID) == -1) {
solarSystemsAffected[solarSystemID] = {
bid: oldBid,
ask: oldAsk
};
}
// Reset prices
prices[solarSystemID][typeID].bid = 0;
prices[solarSystemID][typeID].ask = 0;
bid = 0;
ask = 0;
}
// Update accordingly
if(oldDate <= generatedAt) {
// Switch based on ask/bid
if(isBid === true) {
if(price > bid || bid === 0) {
prices[solarSystemID][typeID].bid = price.toFixed(2);
}
}
if(isBid === false) {
if(price < ask || ask === 0) {
prices[solarSystemID][typeID].ask = price.toFixed(2);
}
}
prices[solarSystemID][typeID].generatedAt = generatedAt;
}
}
// After new data got added, trigger update events for the systems
for(var system in solarSystemsAffected) {
var response = {};
response[typeID] = prices[system][typeID];
// Emit time and price update to real-time clients
io.sockets. in (system + '-realtime').emit('update', response);
// Only send data to non-real-time clients if we actually updated something apart form generatedAt
if((prices[system][typeID].bid != solarSystemsAffected[system].bid) || (prices[system][typeID].ask != solarSystemsAffected[system].ask)) {
io.sockets. in (system).emit('update', response);
}
}
}
}
});
});
// Reconnect
// Voodoo code makes the zmq socket stay open
// Otherwise it would get removed by the garbage collection
setTimeout(function() {
if(false) {
zmqSocket.connect(relay);
}
}, 1000 * 60 * 60 * 24 * 365);
/*
* Status Display
*/
// Status
if(config.displayStatus) {
setInterval(function() {
if(!outBlocked) {
process.stdout.clearLine();
process.stdout.cursorTo(0);
now = new Date(Date.now());
process.stdout.write('[' + now.toLocaleTimeString() + '] There are currently ' + String(Object.keys(prices).length) + ' systems in our DB. Receiving ' + messagesTotal / 0.5 + ' messages per second (H: ' + (messagesTotal - messagesOrders) / 0.5 + ' / O: ' + messagesOrders / 0.5 + ' / U: ' + dataUpdated / 0.5 + ').');
}
messagesTotal = 0;
messagesOrders = 0;
dataUpdated = 0;
}, 500);
}
// DB writer
if(config.externalStorage) {
setInterval(function() {
now = new Date(Date.now());
process.stdout.write('\n[' + now.toLocaleTimeString() + '] Writing DB to Redis: ');
outBlocked = true;
// Compress DB depending of settings
if(config.externalStorageCompression) {
// Compress DB
zlib.deflate(JSON.stringify(prices), function(error, compressedData) {
// Write to Redis
redis.set('eve-live-db-compressed', compressedData, function(err, reply) {
console.log('OK!'.green);
outBlocked = false;
});
});
} else {
// Write to Redis
redis.set('eve-live-db', JSON.stringify(prices), function(err, reply) {
console.log('OK!'.green);
outBlocked = false;
});
}
}, config.externalStorageInterval);
}