Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES aggregations #601

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/easysearch:core/lib/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Index {
permission: () => true,
defaultSearchOptions: {},
countUpdateIntervalMs: 2000,
aggsUpdateIntervalMs: 10000
};
}

Expand Down
39 changes: 34 additions & 5 deletions packages/easysearch:core/lib/core/search-collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ class SearchCollection {
*
* @constructor
*/
constructor(indexConfiguration, engine) {
constructor(indexConfiguration, engine, mongoCount = true) {
check(indexConfiguration, Object);
check(indexConfiguration.name, Match.OneOf(String, null));
check(mongoCount, Boolean);

if (!(engine instanceof ReactiveEngine)) {
throw new Meteor.Error('invalid-engine', 'engine needs to be instanceof ReactiveEngine');
Expand All @@ -28,6 +29,7 @@ class SearchCollection {
this._indexConfiguration = indexConfiguration;
this._name = `${indexConfiguration.name}/easySearch`;
this._engine = engine;
this.mongoCount = mongoCount;

if (Meteor.isClient) {
this._collection = new Mongo.Collection(this._name);
Expand Down Expand Up @@ -184,20 +186,47 @@ class SearchCollection {
this.added(collectionName, 'searchCount' + definitionString, { count });

let intervalID;

if (collectionScope._indexConfiguration.countUpdateIntervalMs) {
intervalID = Meteor.setInterval(() => {
let newCount;
Copy link
Contributor Author

@bompi88 bompi88 Apr 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matteodem It seems that the cursor.count() returns the initial count value set while instantiating the cursor, so I added a optional mongoCount parameter that is set to false by the ESCursor. Is this a satisfiable solution or should I do it otherwise?

if (this.mongoCount) {
newCount = cursor.mongoCursor.count();
} else {
newCount = cursor.count && cursor.count() || 0
}

this.changed(
collectionName,
'searchCount' + definitionString,
{ count: newCount }
);
},
collectionScope._indexConfiguration.countUpdateIntervalMs
);
}

const aggs = cursor._aggs;

if (aggs) {
this.added(collectionName, 'aggs' + definitionString, { aggs });
}

let intervalAggsID;

if (aggs && collectionScope._indexConfiguration.aggsUpdateIntervalMs) {
intervalID = Meteor.setInterval(
() => this.changed(
collectionName,
'searchCount' + definitionString,
{ count: cursor.mongoCursor.count && cursor.mongoCursor.count() || 0 }
'aggs' + definitionString,
{ aggs }
),
collectionScope._indexConfiguration.countUpdateIntervalMs
collectionScope._indexConfiguration.aggsUpdateIntervalMs
);
}

this.onStop(function () {
intervalID && Meteor.clearInterval(intervalID);
intervalAggsID && Meteor.clearInterval(intervalAggsID);
resultsHandle && resultsHandle.stop();
});

Expand Down
4 changes: 3 additions & 1 deletion packages/easysearch:core/lib/main.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Index from './core/index';
import Engine from './core/engine';
import ReactiveEngine from './core/reactive-engine';
import SearchCollection from './core/search-collection';
import Cursor from './core/cursor';
import MongoDBEngine from './engines/mongo-db';
import MinimongoEngine from './engines/minimongo';
Expand All @@ -13,5 +14,6 @@ export {
Cursor,
MongoDBEngine,
MinimongoEngine,
MongoTextIndexEngine
MongoTextIndexEngine,
SearchCollection
};
72 changes: 71 additions & 1 deletion packages/easysearch:elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The configuration options that can be passed to `EasSearch.ElasticSearch` as an
* __getElasticSearchDoc(doc, fields)__: Function that returns the document to index, fieldsToIndex by default
* __body(body)__: Function that returns the ElasticSearch body to send when searching

## Mapping, Analyzers and so on
## Mapping

To make changes to the mapping you can use the mapping setting which will set the mapping when creating a new index.

Expand All @@ -53,6 +53,76 @@ const PlayersIndex = new Index({
})
```

## Aggregations
To define aggregations, inject them inside a `body` helper function like in the example below:

```javascript
var index = new EasySearch.Index({
...
engine: new EasySearch.ElasticSearch({
body: function(body, opts) {
body.aggs = {
tags:{
filter: {},
aggs: {
tags: {
terms: { field: 'tags.raw' }
}
}
}
};
return body;
}
...
}
});
```

The aggregations will be available on the cursor returned by the `search` method:

```javascript
var cursor = index.search('test');

// get all aggregations
cursor.getAggregations();

// get aggregation by name
cursor.getAggregation('tags');
```

Example filter component populated by ES aggregations:

__filter.html:__
```html
<template name="filter">
<select class="{{class}}">
<option value="">Choose a tag</option>
{{ #each tags }}
<option value="{{key}}">{{key}}</option>
{{ else }}
<option value="" disabled>No tags available</option>
{{/each}}
</select>
</template>
```

__filter.js:__

```javascript
import './filter.html';

import { DocumentIndex } from '../path/to/document_index.js';

Template.filter.helpers({

tags() {
const agg = DocumentIndex.getComponentMethods().getCursor().getAggregation('tags');
return agg ? agg.tags.buckets : null;
}

});
```

## How to run ElasticSearch

```sh
Expand Down
40 changes: 40 additions & 0 deletions packages/easysearch:elasticsearch/lib/cursor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Cursor } from 'meteor/easysearch:core';

/**
* A Cursor that extends the regular EasySearch cursor. This cursor is Elasticsearch specific.
*
* @type {ESCursor}
*/
class ESCursor extends Cursor {
/**
* Constructor
*
* @param {Mongo.Cursor} hitsCursor Referenced mongo cursor to the regular hits field
* @param {Number} count Count of all documents found in regular hits field
* @param {Object} aggs Raw aggragtion data
* @param {Boolean} isReady Cursor is ready
* @param {Object} publishHandle Publish handle to stop if on client
*
* @constructor
*
*/
constructor(cursor, count, isReady = true, publishHandle = null, aggs = {}) {
check(cursor.fetch, Function);
check(count, Number);
check(aggs, Match.Optional(Object));

super(cursor, count, isReady, publishHandle);

this._aggs = aggs;
}

getAggregation(path) {
return this._aggs[path];
}

getAggregations() {
return this._aggs;
}
}

export default ESCursor;
20 changes: 14 additions & 6 deletions packages/easysearch:elasticsearch/lib/engine.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import ElasticSearchDataSyncer from './data-syncer'
import ESCursor from './cursor'
import ESSearchCollection from './search-collection'

if (Meteor.isServer) {
var Future = Npm.require('fibers/future'),
Expand Down Expand Up @@ -167,7 +169,12 @@ if (Meteor.isServer) {
* @param {Object} indexConfig Index configuration
*/
onIndexCreate(indexConfig) {
super.onIndexCreate(indexConfig);
if (!indexConfig.allowedFields) {
indexConfig.allowedFields = indexConfig.fields;
}

indexConfig.searchCollection = new ESSearchCollection(indexConfig, this);
indexConfig.mongoCollection = indexConfig.searchCollection._collection;

if (Meteor.isServer) {
indexConfig.elasticSearchClient = new elasticsearch.Client(this.config.client);
Expand Down Expand Up @@ -214,7 +221,7 @@ if (Meteor.isServer) {
return;
}

let { total, ids } = this.getCursorData(data),
let { total, ids, aggs } = this.getCursorData(data),
cursor;

if (ids.length > 0) {
Expand All @@ -224,10 +231,10 @@ if (Meteor.isServer) {
})
}, { limit: options.search.limit });
} else {
cursor = EasySearch.Cursor.emptyCursor;
cursor = ESCursor.emptyCursor;
}

fut['return'](new EasySearch.Cursor(cursor, total));
fut['return'](new ESCursor(cursor, total, true, null, aggs));
}));

return fut.wait();
Expand All @@ -242,8 +249,9 @@ if (Meteor.isServer) {
*/
getCursorData(data) {
return {
ids : _.map(data.hits.hits, (resultSet) => resultSet._id),
total: data.hits.total
ids: _.map(data.hits.hits, (resultSet) => resultSet._id),
total: data.hits.total,
aggs: data.aggregations || {}
};
}
}
Expand Down
66 changes: 66 additions & 0 deletions packages/easysearch:elasticsearch/lib/search-collection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { SearchCollection } from 'meteor/easysearch:core';
import ESCursor from './cursor';

/**
* A search collection represents a reactive collection on the client,
* which is used by the ReactiveEngine for searching using Elasticsearch.
*
* @type {ESSearchCollection}
*/
class ESSearchCollection extends SearchCollection {
/**
* Constructor
*
* @param {Object} indexConfiguration Index configuration
* @param {ReactiveEngine} engine Reactive Engine
*
* @constructor
*/
constructor() {
super(...arguments, false);
}

/**
* Find documents on the client.
*
* @param {Object} searchDefinition Search definition
* @param {Object} options Options
*
* @returns {ESCursor}
*/
find(searchDefinition, options) {
if (!Meteor.isClient) {
throw new Error('find can only be used on client');
}

let publishHandle = Meteor.subscribe(this.name, searchDefinition, options);

let count = this._getCount(searchDefinition);
let aggs = this._getAggregation(searchDefinition);
let mongoCursor = this._getMongoCursor(searchDefinition, options);

if (!_.isNumber(count)) {
return new ESCursor(mongoCursor, 0, false, null, aggs);
}

return new ESCursor(mongoCursor, count, true, publishHandle, aggs);
}

/**
* Get the aggregations linked to the search
*
* @params {Object} searchDefinition Search definition
*
* @private
*/
_getAggregation(searchDefinition) {
const aggsDoc = this._collection.findOne('aggs' + JSON.stringify(searchDefinition));
if (aggsDoc) {
return aggsDoc.aggs;
}
return {};
}

}

export default ESSearchCollection;
2 changes: 2 additions & 0 deletions packages/easysearch:elasticsearch/package.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Package.onUse(function(api) {
api.addFiles([
'lib/data-syncer.js',
'lib/engine.js',
'lib/cursor.js',
'lib/search-collection.js'
]);

api.export('EasySearch');
Expand Down