Skip to content

Commit

Permalink
[ issue #47 ] First draft of facet.object.queries
Browse files Browse the repository at this point in the history
  • Loading branch information
agazzarini committed Apr 9, 2015
1 parent 6ba5678 commit e312116
Show file tree
Hide file tree
Showing 2 changed files with 358 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.search.Query;
import org.apache.solr.common.SolrException;
Expand All @@ -18,6 +28,7 @@
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.request.SimpleFacets;
import org.apache.solr.schema.FieldType;
Expand All @@ -29,8 +40,10 @@
import org.apache.solr.search.DocSetCollector;
import org.apache.solr.search.QParser;
import org.apache.solr.search.SyntaxError;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.gazzax.labs.solrdf.handler.search.faceting.rq.DateRangeEndpointCalculator;
import org.gazzax.labs.solrdf.handler.search.faceting.rq.DoubleRangeEndpointCalculator;
import org.gazzax.labs.solrdf.handler.search.faceting.rq.FacetObjectQuery;
import org.gazzax.labs.solrdf.handler.search.faceting.rq.FacetRangeQuery;
import org.gazzax.labs.solrdf.handler.search.faceting.rq.RangeEndpointCalculator;

Expand All @@ -43,6 +56,22 @@
* @since 1.0
*/
public class RDFacets extends SimpleFacets {

static final Executor directExecutor = new Executor() {
@Override
public void execute(Runnable task) {
task.run();
}
};

static final Executor facetExecutor = new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
10,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("facetExecutor"));

/**
* Builds a new {@link RDFacets} with the given data.
*
Expand All @@ -58,6 +87,13 @@ public RDFacets(final ResponseBuilder responseBuilder, final DocSet docs, final
public NamedList<Object> getFacetCounts() {
final NamedList<Object> result = super.getFacetCounts();
result.remove("facet_dates");

try {
result.add("facet_object_queries", getFacetObjectQueriesCounts());
} catch (IOException | SyntaxError e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
result.add("facet_object_ranges_queries", result.remove("facet_ranges"));
return result;
}
Expand All @@ -66,7 +102,111 @@ public NamedList<Object> getFacetCounts() {
public NamedList<Object> getFacetDateCounts() {
return null;
}

/**
* &facet.object.q=
* @return
* @throws IOException
* @throws SyntaxError
*/
public NamedList<Object> getFacetObjectQueriesCounts() throws IOException, SyntaxError {
final NamedList<Object> result = new SimpleOrderedMap<>();

final List<FacetObjectQuery> facetObjectQueries = new ArrayList<FacetObjectQuery>();
final String[] anonymousQueries = params.getParams(FacetObjectQuery.QUERY);
int index = 0;
final SolrParams requiredParams = new RequiredSolrParams(params);
if (anonymousQueries != null && anonymousQueries.length > 0) {
for (final String query : anonymousQueries) {
facetObjectQueries.add(
FacetObjectQuery.newAnonymousQuery(
query,
index++ == 0 ? params.get(FacetObjectQuery.QUERY_ALIAS) : null,
params,
requiredParams));
}
}

index = 0;
String query = null;
while ((query = params.get(FacetObjectQuery.QUERY + "." + (++index))) != null) {
facetObjectQueries.add(
FacetObjectQuery.newQuery(
query,
index,
params,
requiredParams));
}

if (facetObjectQueries.isEmpty()) {
return result;
}

// Passing a negative number for FACET_THREADS implies an unlimited number of threads is acceptable.
// Also, a subtlety of directExecutor is that no matter how many times you "submit" a job, it's really
// just a method call in that it's run by the calling thread.
int maxThreads = req.getParams().getInt(FacetParams.FACET_THREADS, 0);
final Executor executor = maxThreads == 0 ? directExecutor : facetExecutor;
final Semaphore semaphore = new Semaphore((maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads);
final List<Future<NamedList<Object>>> futures = new ArrayList<>(facetObjectQueries.size());

try {
for (final FacetObjectQuery foq : facetObjectQueries) {
// parseParams(FacetParams.FACET_FIELD, f);
final String termList = localParams == null ? null : localParams.get(CommonParams.TERMS);
final String workerFacetValue = facetValue;
final DocSet workerBase = this.docs;
final Callable<NamedList<Object>> callable = new Callable<NamedList<Object>>() {
@Override
public NamedList<Object> call() throws Exception {
try {
final DocSetCollector collector = new DocSetCollector(docs.size() >> 6, docs.size());
req.getSearcher().search(
QParser.getParser(foq.query(), null, req).getQuery(),
docs.getTopFilter(),
collector);

final NamedList<Object> result = new SimpleOrderedMap<>();
// TBU
if(termList != null) {
List<String> terms = StrUtils.splitSmart(termList, ",", true);
result.add(foq.alias(), getListedTermCounts(workerFacetValue, workerBase, terms));
} else {
result.add(foq.alias(), getTermCounts(foq.fieldName(), collector.getDocSet()));
}
return result;
} catch (SolrException se) {
throw se;
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Exception during facet.field: " + workerFacetValue, e);
} finally {
semaphore.release();
}
}
};

final RunnableFuture<NamedList<Object>> runnableFuture = new FutureTask<NamedList<Object>>(callable);
semaphore.acquire();
executor.execute(runnableFuture);
futures.add(runnableFuture);
}

for (final Future<NamedList<Object>> future : futures) {
result.addAll(future.get());
}
assert semaphore.availablePermits() >= maxThreads;
} catch (InterruptedException exception) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error while processing facet fields: InterruptedException", exception);
} catch (ExecutionException exception) {
final Throwable cause = exception.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error while processing facet fields: " + cause.toString(), cause);
}
return result;
}

@Override
public NamedList<Object> getFacetRangeCounts() throws IOException, SyntaxError {
final NamedList<Object> result = new SimpleOrderedMap<>();
Expand Down
Loading

0 comments on commit e312116

Please sign in to comment.