forked from robfallows/tunguska-reactive-aggregate
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregate.js
65 lines (59 loc) · 1.86 KB
/
aggregate.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
export const ReactiveAggregate = (sub, collection, pipeline, options) => {
import { Promise } from 'meteor/promise';
const defaultOptions = {
aggregationOptions: {},
observeSelector: {},
observeOptions: {},
clientCollection: collection._name
};
options = _.extend(defaultOptions, options);
let initializing = true;
sub._ids = {};
sub._iteration = 1;
const update = async () => {
if (initializing) return;
// add and update documents on the client
try {
const docs = await collection.rawCollection().aggregate(pipeline, options.aggregationOptions).toArray();
docs.forEach(doc => {
if (!sub._ids[doc._id]) {
sub.added(options.clientCollection, doc._id, doc);
} else {
sub.changed(options.clientCollection, doc._id, doc);
}
sub._ids[doc._id] = sub._iteration;
});
// remove documents not in the result anymore
Object.keys(sub._ids).forEach(id => {
if (sub._ids[id] !== sub._iteration) {
delete sub._ids[id];
sub.removed(options.clientCollection, id);
}
});
sub._iteration++;
} catch (err) {
throw err;
}
}
// track any changes on the collection used for the aggregation
const query = collection.find(options.observeSelector, options.observeOptions);
const handle = query.observeChanges({
added: update,
changed: update,
removed: update,
error(err) {
throw err;
}
});
// observeChanges() will immediately fire an "added" event for each document in the query
// these are skipped using the initializing flag
initializing = false;
// send an initial result set to the client
Promise.await(update());
// mark the subscription as ready
sub.ready();
// stop observing the cursor when the client unsubscribes
sub.onStop(function () {
handle.stop();
});
};