This repository has been archived by the owner on Apr 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest.js
145 lines (133 loc) · 4.87 KB
/
test.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
const { ServiceBroker } = require("moleculer");
const { loadConfig } = require("./config");
const { nodeid, sleep, logger, streamToString, to } = require("./utils");
const program = require("commander");
const stream = require("stream");
const s3 = require("./s3");
const pQueue = require("p-queue");
let itemsToGenerate = 1;
let itemsName = "test";
let queue = new pQueue({concurrency: 10, xinterval: 1000, xintervalCap: 40 });
program
.command("* [name] [count]")
.action(function (name, count) {
itemsName = name;
itemsToGenerate = count | 0;
});
program.parse(process.argv);
// create broker
const config = {
...loadConfig(),
}
config.nodeID = nodeid("test");
const broker = new ServiceBroker(config);
// background job
const reflect = (t, p) => p.then(
v => ({ v, t, status: "fulfilled" }),
e => ({ e, t, status: "rejected" })
);
const waitAll = a => Promise.all(a.map(reflect));
let arr = [];
async function run() {
for (let i = 0; i < itemsToGenerate; i++) {
const name = `${itemsName}#${i + 1}`;
//const stream = fs.createReadStream("./gnatsd");
const s = new stream.Readable();
s.push(name);
s.push(null);
const task = { user: "test", name, priority: 0 };
//const p = reflect('send', broker.call("controller.createTask", s, { meta: task }));
//arr.push(reflect('send', broker.call("controller.createTask", s, { meta: task })));
queue.add(() => {
const p = reflect('send', broker.call("controller.createTask", s, { meta: task }));
arr.push(p);
return p;
});
//arr.push(p);
//await sleep(0);
}
}
async function wait() {
// wait for all tasks
let res = { created: { ok: 0, error: 0, null: 0, count: 0 }, results: { ok: 0, error: 0, count: 0 }, total: itemsToGenerate };
while (res.created.count < res.total || res.results.count < res.created.ok) {
await sleep(1000);
arr = arr.filter(p => p.done !== 1);
for (let p of arr) {
if (p.done === 1) continue;
if (!p.isResolved()) continue;
p.done = 1;
p = await p;
if (res.total === 1) console.log(p);
if (p.t === "send") {
res.created.count++;
if (p.e) {
logger.error(p.e);
res.created.error++;
} else if (p.v == null) {
logger.error("null found");
res.created.null++;
} else {
// res.created.error++;
let [err, s] = await to(s3.readFile(p.v.input));
if (err) {
logger.error("s3 error:", err);
res.created.error++;
} else {
let name = await streamToString(s);
if (name !== p.v.name) {
logger.error("not matching:", name, p.v.name);
res.created.error++;
} else {
res.created.ok++;
const task = p.v;
arr.push(reflect('status', broker.call("controller.statusTask", task)));
}
}
}
}
if (p.t === "status") {
if (p.e) {
logger.error(p.e);
} else if (p.v == null) {
logger.error("null found");
} else if (p.v.status === "error") {
logger.error("task error:", p.v.error)
res.results.error++;
res.results.count++;
} else if (p.v.status != "output") {
const task = p.v;
arr.push(reflect('status', broker.call("controller.statusTask", task)));
} else {
res.results.count++;
let [err, s] = await to(s3.readFile(p.v.input));
if (err) {
logger.error("s3 error:", err);
res.results.error++;
} else {
let name = await streamToString(s);
if (name !== p.v.name) {
logger.error("not matching:", name, p.v.name);
res.results.error++;
} else {
res.results.ok++;
}
}
const task = p.v;
broker.call("controller.deleteTask", task);
}
}
}
logger.info("r:", res);
}
}
// start
async function startup() {
await broker.start();
//run();
logger.info("Start");
wait().then(() => { process.exit(0); });
run();
//process.exit(0);
}
startup();