Skip to content

Commit

Permalink
Implements aggregation support with Elasticsearch, cursor.getAggregat…
Browse files Browse the repository at this point in the history
…ions(), cursor.getAggregation(name)
  • Loading branch information
bompi88 committed Apr 25, 2017
1 parent b5e7ae8 commit 912fa7e
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 12 deletions.
1 change: 1 addition & 0 deletions packages/easysearch:core/lib/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Index {
permission: () => true,
defaultSearchOptions: {},
countUpdateIntervalMs: 2000,
aggsUpdateIntervalMs: 10000
};
}

Expand Down
39 changes: 34 additions & 5 deletions packages/easysearch:core/lib/core/search-collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ class SearchCollection {
*
* @constructor
*/
constructor(indexConfiguration, engine) {
constructor(indexConfiguration, engine, mongoCount = true) {
check(indexConfiguration, Object);
check(indexConfiguration.name, Match.OneOf(String, null));
check(mongoCount, Boolean);

if (!(engine instanceof ReactiveEngine)) {
throw new Meteor.Error('invalid-engine', 'engine needs to be instanceof ReactiveEngine');
Expand All @@ -28,6 +29,7 @@ class SearchCollection {
this._indexConfiguration = indexConfiguration;
this._name = `${indexConfiguration.name}/easySearch`;
this._engine = engine;
this.mongoCount = mongoCount;

if (Meteor.isClient) {
this._collection = new Mongo.Collection(this._name);
Expand Down Expand Up @@ -184,20 +186,47 @@ class SearchCollection {
this.added(collectionName, 'searchCount' + definitionString, { count });

let intervalID;

if (collectionScope._indexConfiguration.countUpdateIntervalMs) {
intervalID = Meteor.setInterval(() => {
let newCount;
if (this.mongoCount) {
newCount = cursor.mongoCursor.count();
} else {
newCount = cursor.count && cursor.count() || 0
}

this.changed(
collectionName,
'searchCount' + definitionString,
{ count: newCount }
);
},
collectionScope._indexConfiguration.countUpdateIntervalMs
);
}

const aggs = cursor._aggs;

if (aggs) {
this.added(collectionName, 'aggs' + definitionString, { aggs });
}

let intervalAggsID;

if (aggs && collectionScope._indexConfiguration.aggsUpdateIntervalMs) {
intervalID = Meteor.setInterval(
() => this.changed(
collectionName,
'searchCount' + definitionString,
{ count: cursor.mongoCursor.count() }
'aggs' + definitionString,
{ aggs }
),
collectionScope._indexConfiguration.countUpdateIntervalMs
collectionScope._indexConfiguration.aggsUpdateIntervalMs
);
}

this.onStop(function () {
intervalID && Meteor.clearInterval(intervalID);
intervalAggsID && Meteor.clearInterval(intervalAggsID);
resultsHandle && resultsHandle.stop();
});

Expand Down
4 changes: 3 additions & 1 deletion packages/easysearch:core/lib/main.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Index from './core/index';
import Engine from './core/engine';
import ReactiveEngine from './core/reactive-engine';
import SearchCollection from './core/search-collection';
import Cursor from './core/cursor';
import MongoDBEngine from './engines/mongo-db';
import MinimongoEngine from './engines/minimongo';
Expand All @@ -13,5 +14,6 @@ export {
Cursor,
MongoDBEngine,
MinimongoEngine,
MongoTextIndexEngine
MongoTextIndexEngine,
SearchCollection
};
40 changes: 40 additions & 0 deletions packages/easysearch:elasticsearch/lib/cursor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Cursor } from 'meteor/easysearch:core';

/**
* A Cursor that extends the regular EasySearch cursor. This cursor is Elasticsearch specific.
*
* @type {ESCursor}
*/
class ESCursor extends Cursor {
/**
* Constructor
*
* @param {Mongo.Cursor} hitsCursor Referenced mongo cursor to the regular hits field
* @param {Number} count Count of all documents found in regular hits field
* @param {Object} aggs Raw aggragtion data
* @param {Boolean} isReady Cursor is ready
* @param {Object} publishHandle Publish handle to stop if on client
*
* @constructor
*
*/
constructor(cursor, count, isReady = true, publishHandle = null, aggs = {}) {
check(cursor.fetch, Function);
check(count, Number);
check(aggs, Match.Optional(Object));

super(cursor, count, isReady, publishHandle);

this._aggs = aggs;
}

getAggregation(path) {
return this._aggs[path];
}

getAggregations() {
return this._aggs;
}
}

export default ESCursor;
20 changes: 14 additions & 6 deletions packages/easysearch:elasticsearch/lib/engine.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import ElasticSearchDataSyncer from './data-syncer'
import ESCursor from './cursor'
import ESSearchCollection from './search-collection'

if (Meteor.isServer) {
var Future = Npm.require('fibers/future'),
Expand Down Expand Up @@ -125,7 +127,12 @@ if (Meteor.isServer) {
* @param {Object} indexConfig Index configuration
*/
onIndexCreate(indexConfig) {
super.onIndexCreate(indexConfig);
if (!indexConfig.allowedFields) {
indexConfig.allowedFields = indexConfig.fields;
}

indexConfig.searchCollection = new ESSearchCollection(indexConfig, this);
indexConfig.mongoCollection = indexConfig.searchCollection._collection;

if (Meteor.isServer) {
indexConfig.elasticSearchClient = new elasticsearch.Client(this.config.client);
Expand Down Expand Up @@ -170,7 +177,7 @@ if (Meteor.isServer) {
return;
}

let { total, ids } = this.getCursorData(data),
let { total, ids, aggs } = this.getCursorData(data),
cursor;

if (ids.length > 0) {
Expand All @@ -180,10 +187,10 @@ if (Meteor.isServer) {
})
}, { limit: options.search.limit });
} else {
cursor = EasySearch.Cursor.emptyCursor;
cursor = ESCursor.emptyCursor;
}

fut['return'](new EasySearch.Cursor(cursor, total));
fut['return'](new ESCursor(cursor, total, true, null, aggs));
}));

return fut.wait();
Expand All @@ -198,8 +205,9 @@ if (Meteor.isServer) {
*/
getCursorData(data) {
return {
ids : _.map(data.hits.hits, (resultSet) => resultSet._id),
total: data.hits.total
ids: _.map(data.hits.hits, (resultSet) => resultSet._id),
total: data.hits.total,
aggs: data.aggregations || {}
};
}
}
Expand Down
66 changes: 66 additions & 0 deletions packages/easysearch:elasticsearch/lib/search-collection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { SearchCollection } from 'meteor/easysearch:core';
import ESCursor from './cursor';

/**
* A search collection represents a reactive collection on the client,
* which is used by the ReactiveEngine for searching using Elasticsearch.
*
* @type {ESSearchCollection}
*/
class ESSearchCollection extends SearchCollection {
/**
* Constructor
*
* @param {Object} indexConfiguration Index configuration
* @param {ReactiveEngine} engine Reactive Engine
*
* @constructor
*/
constructor() {
super(...arguments, false);
}

/**
* Find documents on the client.
*
* @param {Object} searchDefinition Search definition
* @param {Object} options Options
*
* @returns {ESCursor}
*/
find(searchDefinition, options) {
if (!Meteor.isClient) {
throw new Error('find can only be used on client');
}

let publishHandle = Meteor.subscribe(this.name, searchDefinition, options);

let count = this._getCount(searchDefinition);
let aggs = this._getAggregation(searchDefinition);
let mongoCursor = this._getMongoCursor(searchDefinition, options);

if (!_.isNumber(count)) {
return new ESCursor(mongoCursor, 0, false, null, aggs);
}

return new ESCursor(mongoCursor, count, true, publishHandle, aggs);
}

/**
* Get the aggregations linked to the search
*
* @params {Object} searchDefinition Search definition
*
* @private
*/
_getAggregation(searchDefinition) {
const aggsDoc = this._collection.findOne('aggs' + JSON.stringify(searchDefinition));
if (aggsDoc) {
return aggsDoc.aggs;
}
return {};
}

}

export default ESSearchCollection;
2 changes: 2 additions & 0 deletions packages/easysearch:elasticsearch/package.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Package.onUse(function(api) {
api.addFiles([
'lib/data-syncer.js',
'lib/engine.js',
'lib/cursor.js',
'lib/search-collection.js'
]);

api.export('EasySearch');
Expand Down

0 comments on commit 912fa7e

Please sign in to comment.