Skip to content
This repository has been archived by the owner on Aug 9, 2022. It is now read-only.

Commit

Permalink
Adds store search failure setting (#33)
Browse files Browse the repository at this point in the history
Adds support for optional persistence for search failures

Co-authored-by: Surya Nistala <[email protected]>
Co-authored-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
eirsep and Bukhtawar authored Feb 3, 2021
1 parent aa435df commit c1779e9
Show file tree
Hide file tree
Showing 18 changed files with 303 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.CLOSED;
import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.INIT;
Expand All @@ -59,30 +60,33 @@ public class AsynchronousSearchActiveContext extends AsynchronousSearchContext i
private final SetOnce<Exception> error;
private final SetOnce<SearchResponse> searchResponse;
private final AtomicBoolean closed;
private AsynchronousSearchContextPermits asynchronousSearchContextPermits;
private final Supplier<Boolean> persistSearchFailureSupplier;
private final AsynchronousSearchContextPermits asynchronousSearchContextPermits;
private final Supplier<SearchResponse> partialResponseSupplier;
@Nullable
private final User user;

public AsynchronousSearchActiveContext(AsynchronousSearchContextId asynchronousSearchContextId, String nodeId,
TimeValue keepAlive, boolean keepOnCompletion,
ThreadPool threadPool, LongSupplier currentTimeSupplier,
AsynchronousSearchProgressListener searchProgressActionListener,
@Nullable User user) {
TimeValue keepAlive, boolean keepOnCompletion, ThreadPool threadPool,
LongSupplier currentTimeSupplier,
AsynchronousSearchProgressListener asynchronousSearchProgressListener, @Nullable User user,
Supplier<Boolean> persistSearchFailureSupplier) {
super(asynchronousSearchContextId, currentTimeSupplier);
this.keepOnCompletion = keepOnCompletion;
this.error = new SetOnce<>();
this.searchResponse = new SetOnce<>();
this.keepAlive = keepAlive;
this.nodeId = nodeId;
this.asynchronousSearchProgressListener = searchProgressActionListener;
this.asynchronousSearchProgressListener = asynchronousSearchProgressListener;
this.partialResponseSupplier = () -> asynchronousSearchProgressListener.partialResponse();
this.searchTask = new SetOnce<>();
this.asynchronousSearchId = new SetOnce<>();
this.completed = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
this.asynchronousSearchContextPermits = keepOnCompletion ? new AsynchronousSearchContextPermits(asynchronousSearchContextId,
threadPool) :
new NoopAsynchronousSearchContextPermits(asynchronousSearchContextId);
threadPool) : new NoopAsynchronousSearchContextPermits(asynchronousSearchContextId);
this.user = user;
this.persistSearchFailureSupplier = persistSearchFailureSupplier;
}

public void setTask(SearchTask searchTask) {
Expand All @@ -99,36 +103,38 @@ public void setTask(SearchTask searchTask) {

public void processSearchFailure(Exception e) {
assert isAlive();
if (completed.compareAndSet(false, true)) {
// we don't want to process stack traces
// we don't want to process stack traces
try {
if (e.getCause() != null) {
e.getCause().setStackTrace(new StackTraceElement[0]);
}
error.set(e);
this.error.set(e);
} finally {
boolean result = completed.compareAndSet(false, true);
assert result : "Process search failure already complete";
}
}

public void processSearchResponse(SearchResponse response) {
assert isAlive();
if (completed.compareAndSet(false, true)) {
ShardSearchFailure [] shardSearchFailures = response.getShardFailures();
for(ShardSearchFailure shardSearchFailure : shardSearchFailures) {
try {
ShardSearchFailure[] shardSearchFailures = response.getShardFailures();
for (ShardSearchFailure shardSearchFailure : shardSearchFailures) {
// we don't want to process stack traces
if (shardSearchFailure.getCause() != null) {
shardSearchFailure.getCause().setStackTrace(new StackTraceElement[0]);
}
}
this.searchResponse.set(response);
} finally {
boolean result = completed.compareAndSet(false, true);
assert result : "Process search response already complete";
}
}

@Override
public SearchResponse getSearchResponse() {
if (searchResponse.get() != null) {
return searchResponse.get();
} else {
return asynchronousSearchProgressListener.partialResponse();
}
return completed.get() ? searchResponse.get() : partialResponseSupplier.get();
}

@Override
Expand All @@ -137,7 +143,7 @@ public String getAsynchronousSearchId() {
}

public boolean shouldPersist() {
return keepOnCompletion && isExpired() == false && isAlive();
return keepOnCompletion && isExpired() == false && isAlive() && (error.get() == null || persistSearchFailureSupplier.get());
}

public boolean keepOnCompletion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -44,7 +44,7 @@
* The implementation of {@link CompositeSearchProgressActionListener} responsible for updating the partial results of a single asynchronous
* search request. All partial results are updated atomically.
*/
public class AsynchronousSearchProgressListener extends SearchProgressActionListener {
public class AsynchronousSearchProgressListener extends SearchProgressActionListener implements PartialResponseProvider {

private PartialResultsHolder partialResultsHolder;
private final CompositeSearchProgressActionListener<AsynchronousSearchResponse> searchProgressActionListener;
Expand All @@ -53,7 +53,7 @@ public class AsynchronousSearchProgressListener extends SearchProgressActionList
private final ExecutorService executor;

public AsynchronousSearchProgressListener(long relativeStartMillis, Function<SearchResponse,
AsynchronousSearchResponse> successFunction,
AsynchronousSearchResponse> successFunction,
Function<Exception, AsynchronousSearchResponse> failureFunction,
ExecutorService executor, LongSupplier relativeTimeSupplier,
Supplier<InternalAggregation.ReduceContextBuilder> reduceContextBuilder) {
Expand All @@ -69,6 +69,7 @@ public AsynchronousSearchProgressListener(long relativeStartMillis, Function<Sea
* Returns the partial response for the search response.
* @return the partial search response
*/
@Override
public SearchResponse partialResponse() {
return partialResultsHolder.partialResponse();
}
Expand Down Expand Up @@ -234,7 +235,7 @@ public SearchResponse partialResponse() {
//before final reduce phase ensure we do a top-level final reduce to get reduced aggregation results
//else we might be returning back all the partial results aggregated so far
} else if (partialInternalAggregations.get() != null) {
finalAggregation = InternalAggregations.topLevelReduce(Arrays.asList(partialInternalAggregations.get()),
finalAggregation = InternalAggregations.topLevelReduce(Collections.singletonList(partialInternalAggregations.get()),
reduceContextBuilder.get().forFinalReduction());
}
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.amazon.opendistroforelasticsearch.search.asynchronous.listener;

import org.elasticsearch.action.search.SearchResponse;

@FunctionalInterface
public interface PartialResponseProvider {

SearchResponse partialResponse();
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public List<Setting<?>> getSettings() {
AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING,
AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING,
AsynchronousSearchManagementService.PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING,
AsynchronousSearchManagementService.ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING
AsynchronousSearchManagementService.ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING,
AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -63,13 +64,12 @@ public AsynchronousSearchPostProcessor(AsynchronousSearchPersistenceService asyn
AsynchronousSearchActiveStore asynchronousSearchActiveStore,
AsynchronousSearchStateMachine stateMachine,
Consumer<AsynchronousSearchActiveContext> freeActiveContextConsumer,
ThreadPool threadPool) {
ThreadPool threadPool, ClusterService clusterService) {
this.asynchronousSearchActiveStore = asynchronousSearchActiveStore;
this.asynchronousSearchPersistenceService = asynchronousSearchPersistenceService;
this.asynchronousSearchStateMachine = stateMachine;
this.freeActiveContextConsumer = freeActiveContextConsumer;
this.threadPool = threadPool;

}

public AsynchronousSearchResponse processSearchFailure(Exception exception, AsynchronousSearchContextId asynchronousSearchContextId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.AsynchronousSearchContextId;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveContext;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveStore;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.permits.AsynchronousSearchContextPermits;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.persistence.AsynchronousSearchPersistenceContext;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchStateMachine;
Expand Down Expand Up @@ -116,6 +117,10 @@ public class AsynchronousSearchService extends AbstractLifecycleComponent implem
"opendistro_asynchronous_search.max_wait_for_completion_timeout", timeValueMinutes(1), Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Boolean> PERSIST_SEARCH_FAILURES_SETTING =
Setting.boolSetting("opendistro_asynchronous_search.persist_search_failures", false,
Setting.Property.NodeScope, Setting.Property.Dynamic);

private volatile long maxKeepAlive;
private volatile long maxWaitForCompletionTimeout;
private volatile long maxSearchRunningTime;
Expand All @@ -130,6 +135,7 @@ public class AsynchronousSearchService extends AbstractLifecycleComponent implem
private final AsynchronousSearchStateMachine asynchronousSearchStateMachine;
private final NamedWriteableRegistry namedWriteableRegistry;
private final AsynchronousSearchContextEventListener contextEventListener;
private volatile boolean persistSearchFailure;

public AsynchronousSearchService(AsynchronousSearchPersistenceService asynchronousSearchPersistenceService,
AsynchronousSearchActiveStore asynchronousSearchActiveStore, Client client,
Expand All @@ -146,15 +152,16 @@ public AsynchronousSearchService(AsynchronousSearchPersistenceService asynchrono
setMaxWaitForCompletionTimeout(MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_SEARCH_RUNNING_TIME_SETTING, this::setMaxSearchRunningTime);
setMaxSearchRunningTime(MAX_SEARCH_RUNNING_TIME_SETTING.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(PERSIST_SEARCH_FAILURES_SETTING, this::setPersistSearchFailure);
setPersistSearchFailure(PERSIST_SEARCH_FAILURES_SETTING.get(clusterService.getSettings()));
this.threadPool = threadPool;
this.clusterService = clusterService;
this.persistenceService = asynchronousSearchPersistenceService;
this.currentTimeSupplier = System::currentTimeMillis;
this.asynchronousSearchActiveStore = asynchronousSearchActiveStore;
this.asynchronousSearchStateMachine = initStateMachine();
this.asynchronousSearchPostProcessor = new AsynchronousSearchPostProcessor(persistenceService, asynchronousSearchActiveStore,
asynchronousSearchStateMachine,
this::freeActiveContext, threadPool);
asynchronousSearchStateMachine, this::freeActiveContext, threadPool, clusterService);
this.namedWriteableRegistry = namedWriteableRegistry;
}

Expand Down Expand Up @@ -192,8 +199,8 @@ public AsynchronousSearchContext createAndStoreContext(SubmitAsynchronousSearchR
threadPool::relativeTimeInMillis,
reduceContextBuilder);
AsynchronousSearchActiveContext asynchronousSearchContext = new AsynchronousSearchActiveContext(asynchronousSearchContextId,
clusterService.localNode().getId(),
request.getKeepAlive(), request.getKeepOnCompletion(), threadPool, currentTimeSupplier, progressActionListener, user);
clusterService.localNode().getId(), request.getKeepAlive(), request.getKeepOnCompletion(), threadPool, currentTimeSupplier,
progressActionListener, user, () -> persistSearchFailure);
asynchronousSearchActiveStore.putContext(asynchronousSearchContextId, asynchronousSearchContext,
contextEventListener::onContextRejected);
contextEventListener.onContextInitialized(asynchronousSearchContextId);
Expand Down Expand Up @@ -477,7 +484,7 @@ public boolean onCancelledFreeActiveContext(AsynchronousSearchActiveContext asyn

/**
* If an active context is found, a permit is acquired from
* {@linkplain com.amazon.opendistroforelasticsearch.search.asynchronous.context.permits.AsynchronousSearchContextPermits}
* {@linkplain AsynchronousSearchContextPermits}
* and on acquisition of permit, a check is performed to see if response has been persisted in system index. If true, we update
* expiration in index. Else we update expiration field in {@linkplain AsynchronousSearchActiveContext}.
*
Expand Down Expand Up @@ -695,4 +702,8 @@ private Exception translateException(String id, Exception e) {
return e;
}
}

private void setPersistSearchFailure(boolean persistSearchFailure) {
this.persistSearchFailure = persistSearchFailure;
}
}
Loading

0 comments on commit c1779e9

Please sign in to comment.