From 912fa7ea67d0b06ad50039ec1d7eccc8a5ff22bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B8rn=20Br=C3=A5then?= Date: Tue, 25 Apr 2017 09:47:23 +0200 Subject: [PATCH] Implements aggregation support with Elasticsearch, cursor.getAggregations(), cursor.getAggregation(name) --- packages/easysearch:core/lib/core/index.js | 1 + .../lib/core/search-collection.js | 39 +++++++++-- packages/easysearch:core/lib/main.js | 4 +- .../easysearch:elasticsearch/lib/cursor.js | 40 +++++++++++ .../easysearch:elasticsearch/lib/engine.js | 20 ++++-- .../lib/search-collection.js | 66 +++++++++++++++++++ packages/easysearch:elasticsearch/package.js | 2 + 7 files changed, 160 insertions(+), 12 deletions(-) create mode 100644 packages/easysearch:elasticsearch/lib/cursor.js create mode 100644 packages/easysearch:elasticsearch/lib/search-collection.js diff --git a/packages/easysearch:core/lib/core/index.js b/packages/easysearch:core/lib/core/index.js index 6cb7160..97fd57c 100644 --- a/packages/easysearch:core/lib/core/index.js +++ b/packages/easysearch:core/lib/core/index.js @@ -44,6 +44,7 @@ class Index { permission: () => true, defaultSearchOptions: {}, countUpdateIntervalMs: 2000, + aggsUpdateIntervalMs: 10000 }; } diff --git a/packages/easysearch:core/lib/core/search-collection.js b/packages/easysearch:core/lib/core/search-collection.js index d30d360..106c8f0 100644 --- a/packages/easysearch:core/lib/core/search-collection.js +++ b/packages/easysearch:core/lib/core/search-collection.js @@ -17,9 +17,10 @@ class SearchCollection { * * @constructor */ - constructor(indexConfiguration, engine) { + constructor(indexConfiguration, engine, mongoCount = true) { check(indexConfiguration, Object); check(indexConfiguration.name, Match.OneOf(String, null)); + check(mongoCount, Boolean); if (!(engine instanceof ReactiveEngine)) { throw new Meteor.Error('invalid-engine', 'engine needs to be instanceof ReactiveEngine'); @@ -28,6 +29,7 @@ class SearchCollection { this._indexConfiguration = indexConfiguration; this._name = `${indexConfiguration.name}/easySearch`; this._engine = engine; + this.mongoCount = mongoCount; if (Meteor.isClient) { this._collection = new Mongo.Collection(this._name); @@ -184,20 +186,47 @@ class SearchCollection { this.added(collectionName, 'searchCount' + definitionString, { count }); let intervalID; - if (collectionScope._indexConfiguration.countUpdateIntervalMs) { + intervalID = Meteor.setInterval(() => { + let newCount; + if (this.mongoCount) { + newCount = cursor.mongoCursor.count(); + } else { + newCount = cursor.count && cursor.count() || 0 + } + + this.changed( + collectionName, + 'searchCount' + definitionString, + { count: newCount } + ); + }, + collectionScope._indexConfiguration.countUpdateIntervalMs + ); + } + + const aggs = cursor._aggs; + + if (aggs) { + this.added(collectionName, 'aggs' + definitionString, { aggs }); + } + + let intervalAggsID; + + if (aggs && collectionScope._indexConfiguration.aggsUpdateIntervalMs) { intervalID = Meteor.setInterval( () => this.changed( collectionName, - 'searchCount' + definitionString, - { count: cursor.mongoCursor.count() } + 'aggs' + definitionString, + { aggs } ), - collectionScope._indexConfiguration.countUpdateIntervalMs + collectionScope._indexConfiguration.aggsUpdateIntervalMs ); } this.onStop(function () { intervalID && Meteor.clearInterval(intervalID); + intervalAggsID && Meteor.clearInterval(intervalAggsID); resultsHandle && resultsHandle.stop(); }); diff --git a/packages/easysearch:core/lib/main.js b/packages/easysearch:core/lib/main.js index 8c51bd8..74937e3 100644 --- a/packages/easysearch:core/lib/main.js +++ b/packages/easysearch:core/lib/main.js @@ -1,6 +1,7 @@ import Index from './core/index'; import Engine from './core/engine'; import ReactiveEngine from './core/reactive-engine'; +import SearchCollection from './core/search-collection'; import Cursor from './core/cursor'; import MongoDBEngine from './engines/mongo-db'; import MinimongoEngine from './engines/minimongo'; @@ -13,5 +14,6 @@ export { Cursor, MongoDBEngine, MinimongoEngine, - MongoTextIndexEngine + MongoTextIndexEngine, + SearchCollection }; diff --git a/packages/easysearch:elasticsearch/lib/cursor.js b/packages/easysearch:elasticsearch/lib/cursor.js new file mode 100644 index 0000000..5bb3cd8 --- /dev/null +++ b/packages/easysearch:elasticsearch/lib/cursor.js @@ -0,0 +1,40 @@ +import { Cursor } from 'meteor/easysearch:core'; + +/** + * A Cursor that extends the regular EasySearch cursor. This cursor is Elasticsearch specific. + * + * @type {ESCursor} + */ +class ESCursor extends Cursor { + /** + * Constructor + * + * @param {Mongo.Cursor} hitsCursor Referenced mongo cursor to the regular hits field + * @param {Number} count Count of all documents found in regular hits field + * @param {Object} aggs Raw aggragtion data + * @param {Boolean} isReady Cursor is ready + * @param {Object} publishHandle Publish handle to stop if on client + * + * @constructor + * + */ + constructor(cursor, count, isReady = true, publishHandle = null, aggs = {}) { + check(cursor.fetch, Function); + check(count, Number); + check(aggs, Match.Optional(Object)); + + super(cursor, count, isReady, publishHandle); + + this._aggs = aggs; + } + + getAggregation(path) { + return this._aggs[path]; + } + + getAggregations() { + return this._aggs; + } +} + +export default ESCursor; diff --git a/packages/easysearch:elasticsearch/lib/engine.js b/packages/easysearch:elasticsearch/lib/engine.js index 1ba07b6..56c7426 100644 --- a/packages/easysearch:elasticsearch/lib/engine.js +++ b/packages/easysearch:elasticsearch/lib/engine.js @@ -1,4 +1,6 @@ import ElasticSearchDataSyncer from './data-syncer' +import ESCursor from './cursor' +import ESSearchCollection from './search-collection' if (Meteor.isServer) { var Future = Npm.require('fibers/future'), @@ -125,7 +127,12 @@ if (Meteor.isServer) { * @param {Object} indexConfig Index configuration */ onIndexCreate(indexConfig) { - super.onIndexCreate(indexConfig); + if (!indexConfig.allowedFields) { + indexConfig.allowedFields = indexConfig.fields; + } + + indexConfig.searchCollection = new ESSearchCollection(indexConfig, this); + indexConfig.mongoCollection = indexConfig.searchCollection._collection; if (Meteor.isServer) { indexConfig.elasticSearchClient = new elasticsearch.Client(this.config.client); @@ -170,7 +177,7 @@ if (Meteor.isServer) { return; } - let { total, ids } = this.getCursorData(data), + let { total, ids, aggs } = this.getCursorData(data), cursor; if (ids.length > 0) { @@ -180,10 +187,10 @@ if (Meteor.isServer) { }) }, { limit: options.search.limit }); } else { - cursor = EasySearch.Cursor.emptyCursor; + cursor = ESCursor.emptyCursor; } - fut['return'](new EasySearch.Cursor(cursor, total)); + fut['return'](new ESCursor(cursor, total, true, null, aggs)); })); return fut.wait(); @@ -198,8 +205,9 @@ if (Meteor.isServer) { */ getCursorData(data) { return { - ids : _.map(data.hits.hits, (resultSet) => resultSet._id), - total: data.hits.total + ids: _.map(data.hits.hits, (resultSet) => resultSet._id), + total: data.hits.total, + aggs: data.aggregations || {} }; } } diff --git a/packages/easysearch:elasticsearch/lib/search-collection.js b/packages/easysearch:elasticsearch/lib/search-collection.js new file mode 100644 index 0000000..484d8b4 --- /dev/null +++ b/packages/easysearch:elasticsearch/lib/search-collection.js @@ -0,0 +1,66 @@ +import { SearchCollection } from 'meteor/easysearch:core'; +import ESCursor from './cursor'; + +/** + * A search collection represents a reactive collection on the client, + * which is used by the ReactiveEngine for searching using Elasticsearch. + * + * @type {ESSearchCollection} + */ +class ESSearchCollection extends SearchCollection { + /** + * Constructor + * + * @param {Object} indexConfiguration Index configuration + * @param {ReactiveEngine} engine Reactive Engine + * + * @constructor + */ + constructor() { + super(...arguments, false); + } + + /** + * Find documents on the client. + * + * @param {Object} searchDefinition Search definition + * @param {Object} options Options + * + * @returns {ESCursor} + */ + find(searchDefinition, options) { + if (!Meteor.isClient) { + throw new Error('find can only be used on client'); + } + + let publishHandle = Meteor.subscribe(this.name, searchDefinition, options); + + let count = this._getCount(searchDefinition); + let aggs = this._getAggregation(searchDefinition); + let mongoCursor = this._getMongoCursor(searchDefinition, options); + + if (!_.isNumber(count)) { + return new ESCursor(mongoCursor, 0, false, null, aggs); + } + + return new ESCursor(mongoCursor, count, true, publishHandle, aggs); + } + + /** + * Get the aggregations linked to the search + * + * @params {Object} searchDefinition Search definition + * + * @private + */ + _getAggregation(searchDefinition) { + const aggsDoc = this._collection.findOne('aggs' + JSON.stringify(searchDefinition)); + if (aggsDoc) { + return aggsDoc.aggs; + } + return {}; + } + +} + +export default ESSearchCollection; diff --git a/packages/easysearch:elasticsearch/package.js b/packages/easysearch:elasticsearch/package.js index d5d6377..3fa9e7c 100644 --- a/packages/easysearch:elasticsearch/package.js +++ b/packages/easysearch:elasticsearch/package.js @@ -20,6 +20,8 @@ Package.onUse(function(api) { api.addFiles([ 'lib/data-syncer.js', 'lib/engine.js', + 'lib/cursor.js', + 'lib/search-collection.js' ]); api.export('EasySearch');