diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveContext.java b/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveContext.java index 8c4b667c..e781991b 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveContext.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveContext.java @@ -38,6 +38,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; +import java.util.function.Supplier; import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.CLOSED; import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.INIT; @@ -59,30 +60,33 @@ public class AsynchronousSearchActiveContext extends AsynchronousSearchContext i private final SetOnce error; private final SetOnce searchResponse; private final AtomicBoolean closed; - private AsynchronousSearchContextPermits asynchronousSearchContextPermits; + private final Supplier persistSearchFailureSupplier; + private final AsynchronousSearchContextPermits asynchronousSearchContextPermits; + private final Supplier partialResponseSupplier; @Nullable private final User user; public AsynchronousSearchActiveContext(AsynchronousSearchContextId asynchronousSearchContextId, String nodeId, - TimeValue keepAlive, boolean keepOnCompletion, - ThreadPool threadPool, LongSupplier currentTimeSupplier, - AsynchronousSearchProgressListener searchProgressActionListener, - @Nullable User user) { + TimeValue keepAlive, boolean keepOnCompletion, ThreadPool threadPool, + LongSupplier currentTimeSupplier, + AsynchronousSearchProgressListener asynchronousSearchProgressListener, @Nullable User user, + Supplier persistSearchFailureSupplier) { super(asynchronousSearchContextId, currentTimeSupplier); this.keepOnCompletion = keepOnCompletion; this.error = new SetOnce<>(); this.searchResponse = new SetOnce<>(); this.keepAlive = keepAlive; this.nodeId = nodeId; - this.asynchronousSearchProgressListener = searchProgressActionListener; + this.asynchronousSearchProgressListener = asynchronousSearchProgressListener; + this.partialResponseSupplier = () -> asynchronousSearchProgressListener.partialResponse(); this.searchTask = new SetOnce<>(); this.asynchronousSearchId = new SetOnce<>(); this.completed = new AtomicBoolean(false); this.closed = new AtomicBoolean(false); this.asynchronousSearchContextPermits = keepOnCompletion ? new AsynchronousSearchContextPermits(asynchronousSearchContextId, - threadPool) : - new NoopAsynchronousSearchContextPermits(asynchronousSearchContextId); + threadPool) : new NoopAsynchronousSearchContextPermits(asynchronousSearchContextId); this.user = user; + this.persistSearchFailureSupplier = persistSearchFailureSupplier; } public void setTask(SearchTask searchTask) { @@ -99,36 +103,38 @@ public void setTask(SearchTask searchTask) { public void processSearchFailure(Exception e) { assert isAlive(); - if (completed.compareAndSet(false, true)) { - // we don't want to process stack traces + // we don't want to process stack traces + try { if (e.getCause() != null) { e.getCause().setStackTrace(new StackTraceElement[0]); } - error.set(e); + this.error.set(e); + } finally { + boolean result = completed.compareAndSet(false, true); + assert result : "Process search failure already complete"; } } public void processSearchResponse(SearchResponse response) { assert isAlive(); - if (completed.compareAndSet(false, true)) { - ShardSearchFailure [] shardSearchFailures = response.getShardFailures(); - for(ShardSearchFailure shardSearchFailure : shardSearchFailures) { + try { + ShardSearchFailure[] shardSearchFailures = response.getShardFailures(); + for (ShardSearchFailure shardSearchFailure : shardSearchFailures) { // we don't want to process stack traces if (shardSearchFailure.getCause() != null) { shardSearchFailure.getCause().setStackTrace(new StackTraceElement[0]); } } this.searchResponse.set(response); + } finally { + boolean result = completed.compareAndSet(false, true); + assert result : "Process search response already complete"; } } @Override public SearchResponse getSearchResponse() { - if (searchResponse.get() != null) { - return searchResponse.get(); - } else { - return asynchronousSearchProgressListener.partialResponse(); - } + return completed.get() ? searchResponse.get() : partialResponseSupplier.get(); } @Override @@ -137,7 +143,7 @@ public String getAsynchronousSearchId() { } public boolean shouldPersist() { - return keepOnCompletion && isExpired() == false && isAlive(); + return keepOnCompletion && isExpired() == false && isAlive() && (error.get() == null || persistSearchFailureSupplier.get()); } public boolean keepOnCompletion() { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/listener/AsynchronousSearchProgressListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/listener/AsynchronousSearchProgressListener.java index deae23d8..fff39a81 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/listener/AsynchronousSearchProgressListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/listener/AsynchronousSearchProgressListener.java @@ -28,7 +28,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.internal.InternalSearchResponse; -import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -44,7 +44,7 @@ * The implementation of {@link CompositeSearchProgressActionListener} responsible for updating the partial results of a single asynchronous * search request. All partial results are updated atomically. */ -public class AsynchronousSearchProgressListener extends SearchProgressActionListener { +public class AsynchronousSearchProgressListener extends SearchProgressActionListener implements PartialResponseProvider { private PartialResultsHolder partialResultsHolder; private final CompositeSearchProgressActionListener searchProgressActionListener; @@ -53,7 +53,7 @@ public class AsynchronousSearchProgressListener extends SearchProgressActionList private final ExecutorService executor; public AsynchronousSearchProgressListener(long relativeStartMillis, Function successFunction, + AsynchronousSearchResponse> successFunction, Function failureFunction, ExecutorService executor, LongSupplier relativeTimeSupplier, Supplier reduceContextBuilder) { @@ -69,6 +69,7 @@ public AsynchronousSearchProgressListener(long relativeStartMillis, Function> getSettings() { AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING, AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING, AsynchronousSearchManagementService.PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING, - AsynchronousSearchManagementService.ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING + AsynchronousSearchManagementService.ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING, + AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/processor/AsynchronousSearchPostProcessor.java b/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/processor/AsynchronousSearchPostProcessor.java index a8922bdd..44f58268 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/processor/AsynchronousSearchPostProcessor.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/processor/AsynchronousSearchPostProcessor.java @@ -37,6 +37,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.threadpool.ThreadPool; @@ -63,13 +64,12 @@ public AsynchronousSearchPostProcessor(AsynchronousSearchPersistenceService asyn AsynchronousSearchActiveStore asynchronousSearchActiveStore, AsynchronousSearchStateMachine stateMachine, Consumer freeActiveContextConsumer, - ThreadPool threadPool) { + ThreadPool threadPool, ClusterService clusterService) { this.asynchronousSearchActiveStore = asynchronousSearchActiveStore; this.asynchronousSearchPersistenceService = asynchronousSearchPersistenceService; this.asynchronousSearchStateMachine = stateMachine; this.freeActiveContextConsumer = freeActiveContextConsumer; this.threadPool = threadPool; - } public AsynchronousSearchResponse processSearchFailure(Exception exception, AsynchronousSearchContextId asynchronousSearchContextId) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchService.java b/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchService.java index 8fd30a91..c48f464b 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchService.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchService.java @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.search.asynchronous.context.AsynchronousSearchContextId; import com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveContext; import com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveStore; +import com.amazon.opendistroforelasticsearch.search.asynchronous.context.permits.AsynchronousSearchContextPermits; import com.amazon.opendistroforelasticsearch.search.asynchronous.context.persistence.AsynchronousSearchPersistenceContext; import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState; import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchStateMachine; @@ -116,6 +117,10 @@ public class AsynchronousSearchService extends AbstractLifecycleComponent implem "opendistro_asynchronous_search.max_wait_for_completion_timeout", timeValueMinutes(1), Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting PERSIST_SEARCH_FAILURES_SETTING = + Setting.boolSetting("opendistro_asynchronous_search.persist_search_failures", false, + Setting.Property.NodeScope, Setting.Property.Dynamic); + private volatile long maxKeepAlive; private volatile long maxWaitForCompletionTimeout; private volatile long maxSearchRunningTime; @@ -130,6 +135,7 @@ public class AsynchronousSearchService extends AbstractLifecycleComponent implem private final AsynchronousSearchStateMachine asynchronousSearchStateMachine; private final NamedWriteableRegistry namedWriteableRegistry; private final AsynchronousSearchContextEventListener contextEventListener; + private volatile boolean persistSearchFailure; public AsynchronousSearchService(AsynchronousSearchPersistenceService asynchronousSearchPersistenceService, AsynchronousSearchActiveStore asynchronousSearchActiveStore, Client client, @@ -146,6 +152,8 @@ public AsynchronousSearchService(AsynchronousSearchPersistenceService asynchrono setMaxWaitForCompletionTimeout(MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING.get(settings)); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_SEARCH_RUNNING_TIME_SETTING, this::setMaxSearchRunningTime); setMaxSearchRunningTime(MAX_SEARCH_RUNNING_TIME_SETTING.get(settings)); + clusterService.getClusterSettings().addSettingsUpdateConsumer(PERSIST_SEARCH_FAILURES_SETTING, this::setPersistSearchFailure); + setPersistSearchFailure(PERSIST_SEARCH_FAILURES_SETTING.get(clusterService.getSettings())); this.threadPool = threadPool; this.clusterService = clusterService; this.persistenceService = asynchronousSearchPersistenceService; @@ -153,8 +161,7 @@ public AsynchronousSearchService(AsynchronousSearchPersistenceService asynchrono this.asynchronousSearchActiveStore = asynchronousSearchActiveStore; this.asynchronousSearchStateMachine = initStateMachine(); this.asynchronousSearchPostProcessor = new AsynchronousSearchPostProcessor(persistenceService, asynchronousSearchActiveStore, - asynchronousSearchStateMachine, - this::freeActiveContext, threadPool); + asynchronousSearchStateMachine, this::freeActiveContext, threadPool, clusterService); this.namedWriteableRegistry = namedWriteableRegistry; } @@ -192,8 +199,8 @@ public AsynchronousSearchContext createAndStoreContext(SubmitAsynchronousSearchR threadPool::relativeTimeInMillis, reduceContextBuilder); AsynchronousSearchActiveContext asynchronousSearchContext = new AsynchronousSearchActiveContext(asynchronousSearchContextId, - clusterService.localNode().getId(), - request.getKeepAlive(), request.getKeepOnCompletion(), threadPool, currentTimeSupplier, progressActionListener, user); + clusterService.localNode().getId(), request.getKeepAlive(), request.getKeepOnCompletion(), threadPool, currentTimeSupplier, + progressActionListener, user, () -> persistSearchFailure); asynchronousSearchActiveStore.putContext(asynchronousSearchContextId, asynchronousSearchContext, contextEventListener::onContextRejected); contextEventListener.onContextInitialized(asynchronousSearchContextId); @@ -477,7 +484,7 @@ public boolean onCancelledFreeActiveContext(AsynchronousSearchActiveContext asyn /** * If an active context is found, a permit is acquired from - * {@linkplain com.amazon.opendistroforelasticsearch.search.asynchronous.context.permits.AsynchronousSearchContextPermits} + * {@linkplain AsynchronousSearchContextPermits} * and on acquisition of permit, a check is performed to see if response has been persisted in system index. If true, we update * expiration in index. Else we update expiration field in {@linkplain AsynchronousSearchActiveContext}. * @@ -695,4 +702,8 @@ private Exception translateException(String id, Exception e) { return e; } } + + private void setPersistSearchFailure(boolean persistSearchFailure) { + this.persistSearchFailure = persistSearchFailure; + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveContextTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveContextTests.java index e0debfd8..d2d26cef 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveContextTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveContextTests.java @@ -45,13 +45,9 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.CLOSED; import static java.util.Collections.emptyMap; @@ -83,8 +79,7 @@ public void testInitializeContext() { User user = TestClientUtils.randomUser(); TimeValue keepAlive = TimeValue.timeValueDays(randomInt(100)); AsynchronousSearchActiveContext context = new AsynchronousSearchActiveContext(asContextId, node, - keepAlive, keepOnCompletion, threadPool, - threadPool::absoluteTimeInMillis, asProgressListener, user); + keepAlive, keepOnCompletion, threadPool, threadPool::absoluteTimeInMillis, asProgressListener, user, () -> true); assertEquals(AsynchronousSearchState.INIT, context.getAsynchronousSearchState()); assertNull(context.getTask()); assertNull(context.getAsynchronousSearchId()); @@ -119,7 +114,7 @@ public void testTaskBootstrap() { TimeValue keepAlive = TimeValue.timeValueDays(randomInt(100)); AsynchronousSearchActiveContext context = new AsynchronousSearchActiveContext(asContextId, node, keepAlive, keepOnCompletion, threadPool, - threadPool::absoluteTimeInMillis, asProgressListener, user); + threadPool::absoluteTimeInMillis, asProgressListener, user, () -> true); SubmitAsynchronousSearchRequest request = new SubmitAsynchronousSearchRequest(new SearchRequest("test")); request.keepAlive(keepAlive); request.keepOnCompletion(keepOnCompletion); @@ -172,38 +167,29 @@ public void testProcessSearchCompletion() throws InterruptedException { TimeValue keepAlive = TimeValue.timeValueDays(randomInt(100)); AsynchronousSearchActiveContext context = new AsynchronousSearchActiveContext(asContextId, node, keepAlive, keepOnCompletion, threadPool, - threadPool::absoluteTimeInMillis, asProgressListener, null); - - int numThreads = 10; - AtomicInteger numSuccesses = new AtomicInteger(); - List runnables = new ArrayList<>(); - CountDownLatch countDownLatch = new CountDownLatch(numThreads); - for (int i = 0; i < numThreads; i++) { - Runnable runnable = () -> { - if (randomBoolean()) { - SearchResponse mockSearchResponse = getMockSearchResponse(); - context.processSearchResponse(mockSearchResponse); - if (mockSearchResponse.equals(context.getSearchResponse())) { - numSuccesses.getAndIncrement(); - assertNull(context.getSearchError()); - } - } else { - RuntimeException e = new RuntimeException(UUID.randomUUID().toString()); - context.processSearchFailure(e); - if (e.equals(context.getSearchError())) { - numSuccesses.getAndIncrement(); - assertNull(context.getSearchResponse()); - } - } - countDownLatch.countDown(); - }; - runnables.add(runnable); - } - for (Runnable r : runnables) { - threadPool.executor(AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME).execute(r); + threadPool::absoluteTimeInMillis, asProgressListener, null, () -> true); + if (randomBoolean()) { + SearchResponse mockSearchResponse = getMockSearchResponse(); + try { + context.processSearchResponse(mockSearchResponse); + } catch (Exception ex) { + fail("Unexpected exception "+ ex); + } + if (mockSearchResponse.equals(context.getSearchResponse())) { + assertNull(context.getSearchError()); + } + } else { + RuntimeException e = new RuntimeException(UUID.randomUUID().toString()); + try { + context.processSearchFailure(e); + } catch (Exception ex) { + fail("Unexpected exception "+ ex); + } + if (e.equals(context.getSearchError())) { + assertNull(context.getSearchResponse()); + } } - countDownLatch.await(); - assertEquals(numSuccesses.get(), 1); + } finally { ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } @@ -234,7 +220,7 @@ public void testClosedContext() throws InterruptedException { TimeValue keepAlive = TimeValue.timeValueDays(randomInt(100)); AsynchronousSearchActiveContext context = new AsynchronousSearchActiveContext(asContextId, node, keepAlive, keepOnCompletion, threadPool, - threadPool::absoluteTimeInMillis, asProgressListener, null); + threadPool::absoluteTimeInMillis, asProgressListener, null, () -> true); AsynchronousSearchTask task = new AsynchronousSearchTask(randomNonNegativeLong(), "transport", SearchAction.NAME, TaskId.EMPTY_TASK_ID, emptyMap(), context, null, (c) -> { }); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveStoreTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveStoreTests.java index 04f0ba57..72dbeb0c 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveStoreTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/context/active/AsynchronousSearchActiveStoreTests.java @@ -107,7 +107,7 @@ public void testPutContextRejection() throws InterruptedException, BrokenBarrier }; AsynchronousSearchActiveContext context = new AsynchronousSearchActiveContext(asContextId, node.getId(), keepAlive, keepOnCompletion, finalTestThreadPool1, - finalTestThreadPool1::absoluteTimeInMillis, asProgressListener, null); + finalTestThreadPool1::absoluteTimeInMillis, asProgressListener, null, () -> true); activeStore.putContext(asContextId, context, asContextEventListener::onContextRejected); numSuccesses.getAndIncrement(); Optional optional = activeStore.getContext(asContextId); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/integTests/AsynchronousSearchRejectionIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/integTests/AsynchronousSearchRejectionIT.java index 2b24505f..7f2a4d35 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/integTests/AsynchronousSearchRejectionIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/integTests/AsynchronousSearchRejectionIT.java @@ -24,9 +24,11 @@ import com.amazon.opendistroforelasticsearch.search.asynchronous.request.SubmitAsynchronousSearchRequest; import com.amazon.opendistroforelasticsearch.search.asynchronous.response.AcknowledgedResponse; import com.amazon.opendistroforelasticsearch.search.asynchronous.response.AsynchronousSearchResponse; +import com.amazon.opendistroforelasticsearch.search.asynchronous.service.AsynchronousSearchService; import com.amazon.opendistroforelasticsearch.search.asynchronous.utils.AsynchronousSearchAssertions; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.search.SearchAction; @@ -43,6 +45,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -71,15 +74,17 @@ protected Settings nodeSettings(int nodeOrdinal) { .put("thread_pool.write.queue_size", 10) .put("thread_pool.get.size", 1) .put("thread_pool.get.queue_size", 10) + .put(AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING.getKey(), true) .build(); } - + @TestLogging(value = "_root:DEBUG", reason = "flaky") public void testSimulatedSearchRejectionLoad() throws Throwable { for (int i = 0; i < 10; i++) { client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "1").get(); } AtomicInteger numRejections = new AtomicInteger(); + AtomicInteger numRnf = new AtomicInteger(); AtomicInteger numTimeouts = new AtomicInteger(); AtomicInteger numFailures = new AtomicInteger(); int numberOfAsyncOps = randomIntBetween(100, 200); @@ -122,6 +127,9 @@ public void onFailure(Exception e) { numRejections.incrementAndGet(); } else if (cause instanceof ElasticsearchTimeoutException) { numTimeouts.incrementAndGet(); + } else if(cause instanceof ResourceNotFoundException) { + // deletion is in race with task cancellation due to partial merge failure + numRnf.getAndIncrement(); } else { numFailures.incrementAndGet(); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/integTests/AsynchronousSearchStatsIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/integTests/AsynchronousSearchStatsIT.java index e9c104ee..8be24d85 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/integTests/AsynchronousSearchStatsIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/integTests/AsynchronousSearchStatsIT.java @@ -22,6 +22,7 @@ import com.amazon.opendistroforelasticsearch.search.asynchronous.request.SubmitAsynchronousSearchRequest; import com.amazon.opendistroforelasticsearch.search.asynchronous.response.AsynchronousSearchResponse; import com.amazon.opendistroforelasticsearch.search.asynchronous.response.AsynchronousSearchStatsResponse; +import com.amazon.opendistroforelasticsearch.search.asynchronous.service.AsynchronousSearchService; import com.amazon.opendistroforelasticsearch.search.asynchronous.stats.AsynchronousSearchCountStats; import com.amazon.opendistroforelasticsearch.search.asynchronous.stats.AsynchronousSearchStats; import com.amazon.opendistroforelasticsearch.search.asynchronous.utils.TestClientUtils; @@ -60,6 +61,7 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) .put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), asConcurrentLimit) + .put(AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING.getKey(), true) .build(); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/restIT/AsynchronousSearchRestIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/restIT/AsynchronousSearchRestIT.java index 5e6e5f05..4db73e40 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/restIT/AsynchronousSearchRestIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/restIT/AsynchronousSearchRestIT.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.List; +import static com.amazon.opendistroforelasticsearch.search.asynchronous.utils.TestUtils.getResponseAsMap; import static org.hamcrest.Matchers.greaterThan; public class AsynchronousSearchRestIT extends AsynchronousSearchRestTestCase { @@ -97,7 +98,7 @@ public void testSubmitSearchCompletesBeforeWaitForCompletionTimeout() throws IOE SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(searchRequest); submitAsynchronousSearchRequest.keepOnCompletion(true); submitAsynchronousSearchRequest.keepAlive(TimeValue.timeValueHours(5)); - submitAsynchronousSearchRequest.waitForCompletionTimeout(TimeValue.timeValueSeconds(1)); + submitAsynchronousSearchRequest.waitForCompletionTimeout(TimeValue.timeValueMinutes(1)); AsynchronousSearchResponse submitResponse = executeSubmitAsynchronousSearch(submitAsynchronousSearchRequest); List legalStates = Arrays.asList(AsynchronousSearchState.SUCCEEDED, AsynchronousSearchState.PERSIST_SUCCEEDED, @@ -106,7 +107,7 @@ public void testSubmitSearchCompletesBeforeWaitForCompletionTimeout() throws IOE assertHitCount(submitResponse.getSearchResponse(), 5L); GetAsynchronousSearchRequest getAsynchronousSearchRequest = new GetAsynchronousSearchRequest(submitResponse.getId()); AsynchronousSearchResponse getResponse = getAssertedAsynchronousSearchResponse(submitResponse, getAsynchronousSearchRequest); - assertEquals(getResponse, submitResponse); + assertEquals(getResponseAsMap(getResponse.getSearchResponse()), getResponseAsMap(submitResponse.getSearchResponse())); executeDeleteAsynchronousSearch(new DeleteAsynchronousSearchRequest(submitResponse.getId())); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/restIT/AsynchronousSearchSettingsIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/restIT/AsynchronousSearchSettingsIT.java index b78bee58..718c8a4e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/restIT/AsynchronousSearchSettingsIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/restIT/AsynchronousSearchSettingsIT.java @@ -16,6 +16,8 @@ package com.amazon.opendistroforelasticsearch.search.asynchronous.restIT; import com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveStore; +import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState; +import com.amazon.opendistroforelasticsearch.search.asynchronous.request.GetAsynchronousSearchRequest; import com.amazon.opendistroforelasticsearch.search.asynchronous.request.SubmitAsynchronousSearchRequest; import com.amazon.opendistroforelasticsearch.search.asynchronous.response.AsynchronousSearchResponse; import com.amazon.opendistroforelasticsearch.search.asynchronous.service.AsynchronousSearchService; @@ -24,6 +26,7 @@ import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CyclicBarrier; @@ -35,96 +38,127 @@ public class AsynchronousSearchSettingsIT extends AsynchronousSearchRestTestCase { public void testMaxKeepAliveSetting() throws Exception { - SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); - validRequest.keepAlive(TimeValue.timeValueHours(7)); - AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); - assertNotNull(asResponse.getSearchResponse()); - updateClusterSettings(AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING.getKey(), TimeValue.timeValueHours(6)); - SubmitAsynchronousSearchRequest invalidRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); - invalidRequest.keepAlive(TimeValue.timeValueHours(7)); - ResponseException responseException = expectThrows(ResponseException.class, () -> executeSubmitAsynchronousSearch( - invalidRequest)); - assertThat(responseException.getMessage(), containsString("Keep alive for asynchronous search (" + - invalidRequest.getKeepAlive().getMillis() + ") is too large")); - updateClusterSettings(AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING.getKey(), TimeValue.timeValueHours(24)); + SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); + validRequest.keepAlive(TimeValue.timeValueHours(7)); + AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); + assertNotNull(asResponse.getSearchResponse()); + updateClusterSettings(AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING.getKey(), TimeValue.timeValueHours(6)); + SubmitAsynchronousSearchRequest invalidRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); + invalidRequest.keepAlive(TimeValue.timeValueHours(7)); + ResponseException responseException = expectThrows(ResponseException.class, () -> executeSubmitAsynchronousSearch( + invalidRequest)); + assertThat(responseException.getMessage(), containsString("Keep alive for asynchronous search (" + + invalidRequest.getKeepAlive().getMillis() + ") is too large")); + updateClusterSettings(AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING.getKey(), TimeValue.timeValueHours(24)); } public void testSubmitInvalidWaitForCompletion() throws Exception { - SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); - validRequest.waitForCompletionTimeout(TimeValue.timeValueSeconds(50)); - AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); - assertNotNull(asResponse.getSearchResponse()); - updateClusterSettings(AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING.getKey(), - TimeValue.timeValueSeconds(2)); - SubmitAsynchronousSearchRequest invalidRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); - invalidRequest.waitForCompletionTimeout(TimeValue.timeValueSeconds(50)); - ResponseException responseException = expectThrows(ResponseException.class, () -> executeSubmitAsynchronousSearch( - invalidRequest)); - assertThat(responseException.getMessage(), containsString("Wait for completion timeout for asynchronous search (" + - validRequest.getWaitForCompletionTimeout().getMillis() + ") is too large")); - updateClusterSettings(AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING.getKey(), - TimeValue.timeValueSeconds(60)); + SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); + validRequest.waitForCompletionTimeout(TimeValue.timeValueSeconds(50)); + AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); + assertNotNull(asResponse.getSearchResponse()); + updateClusterSettings(AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING.getKey(), + TimeValue.timeValueSeconds(2)); + SubmitAsynchronousSearchRequest invalidRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); + invalidRequest.waitForCompletionTimeout(TimeValue.timeValueSeconds(50)); + ResponseException responseException = expectThrows(ResponseException.class, () -> executeSubmitAsynchronousSearch( + invalidRequest)); + assertThat(responseException.getMessage(), containsString("Wait for completion timeout for asynchronous search (" + + validRequest.getWaitForCompletionTimeout().getMillis() + ") is too large")); + updateClusterSettings(AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING.getKey(), + TimeValue.timeValueSeconds(60)); } public void testMaxRunningAsynchronousSearchContexts() throws Exception { - int numThreads = 50; - List threadsList = new LinkedList<>(); - CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); - for (int i = 0; i < numThreads; i++) { - threadsList.add(new Thread(() -> { + int numThreads = 50; + List threadsList = new LinkedList<>(); + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + for (int i = 0; i < numThreads; i++) { + threadsList.add(new Thread(() -> { + try { + SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); + validRequest.keepAlive(TimeValue.timeValueHours(1)); + AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); + assertNotNull(asResponse.getSearchResponse()); + } catch (IOException e) { + fail("submit request failed"); + } finally { try { - SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); - validRequest.keepAlive(TimeValue.timeValueHours(1)); - AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); - assertNotNull(asResponse.getSearchResponse()); - } catch (IOException e) { - fail("submit request failed"); - } finally { - try { - barrier.await(); - } catch (Exception e) { - fail(); - } + barrier.await(); + } catch (Exception e) { + fail(); } } - )); - } - threadsList.forEach(Thread::start); - barrier.await(); - for (Thread thread : threadsList) { - thread.join(); } + )); + } + threadsList.forEach(Thread::start); + barrier.await(); + for (Thread thread : threadsList) { + thread.join(); + } - updateClusterSettings(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 0); - threadsList.clear(); - AtomicInteger numFailures = new AtomicInteger(); - for (int i = 0; i < numThreads; i++) { - threadsList.add(new Thread(() -> { + updateClusterSettings(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 0); + threadsList.clear(); + AtomicInteger numFailures = new AtomicInteger(); + for (int i = 0; i < numThreads; i++) { + threadsList.add(new Thread(() -> { + try { + SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); + validRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(1)); + AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); + } catch (Exception e) { + assertTrue(e instanceof ResponseException); + assertThat(e.getMessage(), containsString("Trying to create too many running contexts")); + numFailures.getAndIncrement(); + + } finally { try { - SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); - validRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(1)); - AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); + barrier.await(); } catch (Exception e) { - assertTrue(e instanceof ResponseException); - assertThat(e.getMessage(), containsString("Trying to create too many running contexts")); - numFailures.getAndIncrement(); - - } finally { - try { - barrier.await(); - } catch (Exception e) { - fail(); - } + fail(); } } - )); } - threadsList.forEach(Thread::start); - barrier.await(); - for (Thread thread : threadsList) { - thread.join(); + )); + } + threadsList.forEach(Thread::start); + barrier.await(); + for (Thread thread : threadsList) { + thread.join(); + } + assertEquals(numFailures.get(), 50); + updateClusterSettings(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), DEFAULT_MAX_RUNNING_SEARCHES); + } + + public void testStoreAsyncSearchWithFailures() throws Exception { + SubmitAsynchronousSearchRequest request = new SubmitAsynchronousSearchRequest(new SearchRequest("non-existent-index")); + request.keepOnCompletion(true); + request.waitForCompletionTimeout(TimeValue.timeValueMinutes(1)); + AsynchronousSearchResponse response = executeSubmitAsynchronousSearch(request); + assertTrue(Arrays.asList(AsynchronousSearchState.CLOSED, AsynchronousSearchState.FAILED).contains(AsynchronousSearchState.FAILED)); + waitUntil(() -> { + try { + executeGetAsynchronousSearch(new GetAsynchronousSearchRequest(response.getId())); + return false; + } catch (IOException e) { + return e.getMessage().contains("resource_not_found"); + } + }); + updateClusterSettings(AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING.getKey(), + true); + AsynchronousSearchResponse submitResponse = executeSubmitAsynchronousSearch(request); + waitUntil(() -> { + try { + return executeGetAsynchronousSearch(new GetAsynchronousSearchRequest(submitResponse.getId())).getState() + .equals(AsynchronousSearchState.STORE_RESIDENT); + } catch (IOException e) { + return false; } - assertEquals(numFailures.get(), 50); - updateClusterSettings(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), DEFAULT_MAX_RUNNING_SEARCHES); + }); + assertEquals(executeGetAsynchronousSearch(new GetAsynchronousSearchRequest(submitResponse.getId())).getState(), + AsynchronousSearchState.STORE_RESIDENT); + updateClusterSettings(AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING.getKey(), + false); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchPostProcessorTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchPostProcessorTests.java index 30ff1c90..93bef163 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchPostProcessorTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchPostProcessorTests.java @@ -92,6 +92,7 @@ public void createObjects() { final Set> settingsSet = Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of( AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING, + AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING, AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING, AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING, AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING)).collect(Collectors.toSet()); @@ -120,9 +121,10 @@ public void testProcessSearchFailureOnDeletedContext() throws AsynchronousSearch AsynchronousSearchService asService = new AsynchronousSearchService(persistenceService, asActiveStore, fakeClient, mockClusterService, testThreadPool, new InternalAsynchronousSearchStats(), new NamedWriteableRegistry(emptyList())); AsynchronousSearchStateMachine asStateMachine = asService.getStateMachine(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool, discoveryNode, clusterSettings); AsynchronousSearchPostProcessor postProcessor = new AsynchronousSearchPostProcessor(persistenceService, asActiveStore, asStateMachine, - (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool); + (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool, clusterService); SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); submitAsynchronousSearchRequest.keepOnCompletion(true); submitAsynchronousSearchRequest.keepAlive(TimeValue.timeValueHours(1)); @@ -166,9 +168,10 @@ public void testProcessSearchResponseBeginPersistence() throws AsynchronousSearc AsynchronousSearchService asService = new AsynchronousSearchService(persistenceService, asActiveStore, fakeClient, mockClusterService, testThreadPool, new InternalAsynchronousSearchStats(), new NamedWriteableRegistry(emptyList())); AsynchronousSearchStateMachine asStateMachine = asService.getStateMachine(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool, discoveryNode, clusterSettings); AsynchronousSearchPostProcessor postProcessor = new AsynchronousSearchPostProcessor(persistenceService, asActiveStore, asStateMachine, - (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool); + (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool, clusterService); SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); submitAsynchronousSearchRequest.keepOnCompletion(true); submitAsynchronousSearchRequest.keepAlive(TimeValue.timeValueHours(1)); @@ -208,9 +211,10 @@ public void testProcessSearchResponsePersisted() throws AsynchronousSearchStateM AsynchronousSearchService asService = new AsynchronousSearchService(persistenceService, asActiveStore, fakeClient, mockClusterService, testThreadPool, new InternalAsynchronousSearchStats(), new NamedWriteableRegistry(emptyList())); AsynchronousSearchStateMachine asStateMachine = asService.getStateMachine(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool, discoveryNode, clusterSettings); AsynchronousSearchPostProcessor postProcessor = new AsynchronousSearchPostProcessor(persistenceService, asActiveStore, asStateMachine, - (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool); + (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool, clusterService); SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); submitAsynchronousSearchRequest.keepOnCompletion(true); submitAsynchronousSearchRequest.keepAlive(TimeValue.timeValueHours(1)); @@ -253,9 +257,10 @@ public void testProcessSearchResponseForExpiredContext() throws AsynchronousSear AsynchronousSearchService asService = new AsynchronousSearchService(persistenceService, asActiveStore, fakeClient, mockClusterService, testThreadPool, new InternalAsynchronousSearchStats(), new NamedWriteableRegistry(emptyList())); AsynchronousSearchStateMachine asStateMachine = asService.getStateMachine(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool, discoveryNode, clusterSettings); AsynchronousSearchPostProcessor postProcessor = new AsynchronousSearchPostProcessor(persistenceService, asActiveStore, asStateMachine, - (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool); + (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool, clusterService); SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); submitAsynchronousSearchRequest.keepOnCompletion(true); submitAsynchronousSearchRequest.keepAlive(TimeValue.timeValueMillis(1)); @@ -296,9 +301,10 @@ public void testProcessSearchResponseOnClosedContext() throws AsynchronousSearch AsynchronousSearchService asService = new AsynchronousSearchService(persistenceService, asActiveStore, fakeClient, mockClusterService, testThreadPool, new InternalAsynchronousSearchStats(), new NamedWriteableRegistry(emptyList())); AsynchronousSearchStateMachine asStateMachine = asService.getStateMachine(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool, discoveryNode, clusterSettings); AsynchronousSearchPostProcessor postProcessor = new AsynchronousSearchPostProcessor(persistenceService, asActiveStore, asStateMachine, - (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool); + (context) -> activeContextCleanUpConsumerInvocation.compareAndSet(false, true), testThreadPool, clusterService); SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); submitAsynchronousSearchRequest.keepOnCompletion(true); submitAsynchronousSearchRequest.keepAlive(TimeValue.timeValueHours(1)); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceFreeContextTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceFreeContextTests.java index 90239076..fc1b8d40 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceFreeContextTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceFreeContextTests.java @@ -107,6 +107,7 @@ public void createObjects() { Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of( AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING, AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING, + AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING, AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING, AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING)).collect(Collectors.toSet()); final int availableProcessors = EsExecutors.allocatedProcessors(settings); @@ -212,7 +213,7 @@ public void testFreeContextRunningUserMatches() throws InterruptedException { AsynchronousSearchContextId asContextId = new AsynchronousSearchContextId(UUID.randomUUID().toString(), randomNonNegativeLong()); AsynchronousSearchActiveContext asActiveContext = new AsynchronousSearchActiveContext(asContextId, discoveryNode.getId(), - keepAlive, true, testThreadPool, testThreadPool::absoluteTimeInMillis, asProgressListener, user1); + keepAlive, true, testThreadPool, testThreadPool::absoluteTimeInMillis, asProgressListener, user1, () -> true); //bootstrap search AsynchronousSearchTask task = new AsynchronousSearchTask(randomNonNegativeLong(), "transport", SearchAction.NAME, TaskId.EMPTY_TASK_ID, @@ -270,7 +271,7 @@ public void testFreeActiveContextWithCancelledTask() throws InterruptedException AsynchronousSearchContextId asContextId = new AsynchronousSearchContextId(UUID.randomUUID().toString(), randomNonNegativeLong()); AsynchronousSearchActiveContext asActiveContext = new AsynchronousSearchActiveContext(asContextId, discoveryNode.getId(), - keepAlive, true, testThreadPool, testThreadPool::absoluteTimeInMillis, asProgressListener, user1); + keepAlive, true, testThreadPool, testThreadPool::absoluteTimeInMillis, asProgressListener, user1, () -> true); //bootstrap search AsynchronousSearchTask task = new AsynchronousSearchTask(randomNonNegativeLong(), "transport", SearchAction.NAME, TaskId.EMPTY_TASK_ID, @@ -536,7 +537,7 @@ public void testFreeActiveContextKeepOnCompletionFalse() throws InterruptedExcep AsynchronousSearchContextId asContextId = new AsynchronousSearchContextId(UUID.randomUUID().toString(), randomNonNegativeLong()); AsynchronousSearchActiveContext asActiveContext = new AsynchronousSearchActiveContext(asContextId, discoveryNode.getId(), - keepAlive, false, testThreadPool, testThreadPool::absoluteTimeInMillis, asProgressListener, user1); + keepAlive, false, testThreadPool, testThreadPool::absoluteTimeInMillis, asProgressListener, user1, () -> true); //bootstrap search AsynchronousSearchTask task = new AsynchronousSearchTask(randomNonNegativeLong(), "transport", SearchAction.NAME, TaskId.EMPTY_TASK_ID, @@ -654,7 +655,7 @@ static class MockAsynchronousSearchActiveContext extends AsynchronousSearchActiv boolean keepOnCompletion, ThreadPool threadPool, LongSupplier currentTimeSupplier, AsynchronousSearchProgressListener searchProgressActionListener, User user) { super(asContextId, nodeId, keepAlive, keepOnCompletion, threadPool, currentTimeSupplier, searchProgressActionListener, - user); + user, () -> true); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceTests.java index 533d288c..f3bc78c8 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceTests.java @@ -79,6 +79,7 @@ import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.CLOSED; import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.INIT; import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.RUNNING; +import static com.amazon.opendistroforelasticsearch.search.asynchronous.utils.TestUtils.createClusterService; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static org.elasticsearch.action.ActionListener.wrap; @@ -88,19 +89,22 @@ public class AsynchronousSearchServiceTests extends ESTestCase { private ClusterSettings clusterSettings; + private Settings settings; private ExecutorBuilder executorBuilder; static boolean blockPersistence; @Before public void createObjects() { - Settings settings = Settings.builder() + settings = Settings.builder() .put("node.name", "test") .put("cluster.name", "ClusterServiceTests") .put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 10) + .put(AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING.getKey(), true) .build(); final Set> settingsSet = Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of( AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING, AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING, + AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING, AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING, AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING)).collect(Collectors.toSet()); final int availableProcessors = EsExecutors.allocatedProcessors(settings); @@ -119,7 +123,7 @@ public void testFindContext() throws InterruptedException { try { testThreadPool = new TestThreadPool(AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME, executorBuilder); - ClusterService mockClusterService = ClusterServiceUtils.createClusterService(testThreadPool, discoveryNode, clusterSettings); + ClusterService mockClusterService = createClusterService(settings, testThreadPool, discoveryNode, clusterSettings); FakeClient fakeClient = new FakeClient(testThreadPool); AsynchronousSearchActiveStore asActiveStore = new AsynchronousSearchActiveStore(mockClusterService); AsynchronousSearchPersistenceService persistenceService = new AsynchronousSearchPersistenceService(fakeClient, diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceUpdateContextTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceUpdateContextTests.java index f74e742a..ee4b970d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceUpdateContextTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchServiceUpdateContextTests.java @@ -114,6 +114,7 @@ public void createObjects() { Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of( AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING, AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING, + AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING, AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING, AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING)).collect(Collectors.toSet()); final int availableProcessors = EsExecutors.allocatedProcessors(settings); @@ -414,7 +415,7 @@ public void testUpdateContextNoActiveContextFound() throws InterruptedException User user = TestClientUtils.randomUser(); AsynchronousSearchActiveContext context = new AsynchronousSearchActiveContext(asContextId, node, keepAlive, keepOnCompletion, testThreadPool, - testThreadPool::absoluteTimeInMillis, asProgressListener, user); + testThreadPool::absoluteTimeInMillis, asProgressListener, user, () -> true); CountDownLatch latch = new CountDownLatch(1); docNotFound = true; asService.updateKeepAliveAndGetContext(context.getAsynchronousSearchId(), keepAlive, context.getContextId(), @@ -603,7 +604,7 @@ static class MockAsynchronousSearchActiveContext extends AsynchronousSearchActiv boolean keepOnCompletion, ThreadPool threadPool, LongSupplier currentTimeSupplier, AsynchronousSearchProgressListener searchProgressActionListener, User user) { super(asContextId, nodeId, keepAlive, keepOnCompletion, threadPool, currentTimeSupplier, searchProgressActionListener, - user); + user, () -> true); } @Override diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchStateMachineTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchStateMachineTests.java index 1e1fa865..b3282d00 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchStateMachineTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/service/AsynchronousSearchStateMachineTests.java @@ -31,8 +31,6 @@ import com.amazon.opendistroforelasticsearch.search.asynchronous.listener.AsynchronousSearchContextEventListener; import com.amazon.opendistroforelasticsearch.search.asynchronous.listener.AsynchronousSearchProgressListener; import com.amazon.opendistroforelasticsearch.search.asynchronous.plugin.AsynchronousSearchPlugin; -import com.amazon.opendistroforelasticsearch.search.asynchronous.service.AsynchronousSearchPersistenceService; -import com.amazon.opendistroforelasticsearch.search.asynchronous.service.AsynchronousSearchService; import com.amazon.opendistroforelasticsearch.search.asynchronous.task.AsynchronousSearchTask; import org.apache.lucene.search.TotalHits; import org.elasticsearch.ElasticsearchException; @@ -61,7 +59,6 @@ import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ExecutorBuilder; @@ -89,6 +86,7 @@ import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.PERSISTING; import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.RUNNING; import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState.SUCCEEDED; +import static com.amazon.opendistroforelasticsearch.search.asynchronous.utils.TestUtils.createClusterService; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -96,18 +94,21 @@ public class AsynchronousSearchStateMachineTests extends AsynchronousSearchTestC private ClusterSettings clusterSettings; private ExecutorBuilder executorBuilder; + private Settings settings; @Before public void createObjects() { - Settings settings = Settings.builder() + settings = Settings.builder() .put("node.name", "test") .put("cluster.name", "ClusterServiceTests") .put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 10) + .put(AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING.getKey(), true) .build(); final Set> settingsSet = Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of( AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING, AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING, + AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING, AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING, AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING)).collect(Collectors.toSet()); final int availableProcessors = EsExecutors.allocatedProcessors(settings); @@ -124,7 +125,7 @@ public void testStateMachine() throws InterruptedException, BrokenBarrierExcepti TestThreadPool threadPool = null; try { threadPool = new TestThreadPool("test", executorBuilder); - ClusterService mockClusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings); + ClusterService mockClusterService = createClusterService(settings, threadPool, discoveryNode, clusterSettings); FakeClient fakeClient = new FakeClient(threadPool); AsynchronousSearchActiveStore asActiveStore = new AsynchronousSearchActiveStore(mockClusterService); AsynchronousSearchPersistenceService persistenceService = new AsynchronousSearchPersistenceService(fakeClient, @@ -139,7 +140,7 @@ public void testStateMachine() throws InterruptedException, BrokenBarrierExcepti TimeValue keepAlive = TimeValue.timeValueDays(randomInt(100)); AsynchronousSearchActiveContext context = new AsynchronousSearchActiveContext(asContextId, discoveryNode.getId(), keepAlive, true, threadPool, - threadPool::absoluteTimeInMillis, asProgressListener, null); + threadPool::absoluteTimeInMillis, asProgressListener, null, () -> true); assertNull(context.getTask()); assertEquals(context.getAsynchronousSearchState(), INIT); AsynchronousSearchStateMachine stateMachine = asService.getStateMachine(); @@ -150,19 +151,20 @@ public void testStateMachine() throws InterruptedException, BrokenBarrierExcepti randomNonNegativeLong(), "transport", SearchAction.NAME, TaskId.EMPTY_TASK_ID, emptyMap(), context, null, (a) -> {})), RUNNING, IllegalStateException.class, Optional.empty()); + boolean success = randomBoolean(); assertNotNull(context.getTask()); if (randomBoolean()) { //delete running context doConcurrentStateMachineTrigger(stateMachine, new SearchDeletedEvent(context), CLOSED, AsynchronousSearchStateMachineClosedException.class, Optional.empty()); } else { - if (randomBoolean()) {//success or failure - doConcurrentStateMachineTrigger(stateMachine, new SearchSuccessfulEvent(context, getMockSearchResponse()), SUCCEEDED, - IllegalStateException.class, Optional.empty()); - numCompleted.getAndIncrement(); - } else { + if (success) { doConcurrentStateMachineTrigger(stateMachine, new SearchFailureEvent(context, new RuntimeException("test")), FAILED, IllegalStateException.class, Optional.empty()); numFailure.getAndIncrement(); + } else {//success or failure + doConcurrentStateMachineTrigger(stateMachine, new SearchSuccessfulEvent(context, getMockSearchResponse()), SUCCEEDED, + IllegalStateException.class, Optional.empty()); + numCompleted.getAndIncrement(); } doConcurrentStateMachineTrigger(stateMachine, new BeginPersistEvent(context), PERSISTING, IllegalStateException.class, Optional.of(AsynchronousSearchStateMachineClosedException.class)); @@ -174,7 +176,7 @@ public void testStateMachine() throws InterruptedException, BrokenBarrierExcepti } assertEquals(numCompleted.get(), customContextListener.getCompletedCount()); assertEquals(numFailure.get(), customContextListener.getFailedCount()); - assertEquals(0, customContextListener.getRunningCount()); + assertEquals("success:" + success, 0, customContextListener.getRunningCount()); assertEquals(1, customContextListener.getDeletedCount()); } finally { ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/utils/TestUtils.java b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/utils/TestUtils.java new file mode 100644 index 00000000..7cf38879 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/search/asynchronous/utils/TestUtils.java @@ -0,0 +1,56 @@ +package com.amazon.opendistroforelasticsearch.search.asynchronous.utils; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterStatePublisher; +import static org.elasticsearch.test.ClusterServiceUtils.createNoOpNodeConnectionsService; + +public class TestUtils { + + public static ClusterService createClusterService(Settings settings, ThreadPool threadPool, DiscoveryNode localNode, + ClusterSettings clusterSettings) { + ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService()); + ClusterState initialClusterState = ClusterState.builder(new ClusterName(TestUtils.class.getSimpleName())) + .nodes(DiscoveryNodes.builder() + .add(localNode) + .localNodeId(localNode.getId()) + .masterNodeId(localNode.getId())) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build(); + clusterService.getClusterApplierService().setInitialState(initialClusterState); + clusterService.getMasterService().setClusterStatePublisher( + createClusterStatePublisher(clusterService.getClusterApplierService())); + clusterService.getMasterService().setClusterStateSupplier(clusterService.getClusterApplierService()::state); + clusterService.start(); + return clusterService; + } + + public static Map getResponseAsMap(SearchResponse searchResponse) throws IOException { + if (searchResponse != null) { + BytesReference response = XContentHelper.toXContent(searchResponse, Requests.INDEX_CONTENT_TYPE, true); + if (response == null) { + return emptyMap(); + } + return convertToMap(response, false, Requests.INDEX_CONTENT_TYPE).v2(); + } else { + return null; + } + } +}