From 77551778bb1f5397b65a40bf15d07aa95ff17309 Mon Sep 17 00:00:00 2001 From: Khushboo Rajput Date: Mon, 14 Aug 2023 16:50:38 -0700 Subject: [PATCH] Handle Reader thread termination gracefully Signed-off-by: Khushboo Rajput --- build.gradle | 17 ++-- docker/docker-compose.yml | 2 - .../ESLocalhostConnection.java | 57 ++++++++++++++ .../PerformanceAnalyzerApp.java | 68 ++++++++++++++-- .../performanceanalyzer/rca/Version.java | 9 ++- .../PerformanceAnalyzerAppTest.java | 77 +++++++++++++++++++ .../resources/rca/batch_metrics_enabled.conf | 1 + 7 files changed, 210 insertions(+), 21 deletions(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/ESLocalhostConnection.java create mode 100644 src/test/resources/rca/batch_metrics_enabled.conf diff --git a/build.gradle b/build.gradle index 596403c8e..451df7f67 100644 --- a/build.gradle +++ b/build.gradle @@ -34,7 +34,6 @@ buildscript { dependencies { classpath "org.opensearch.gradle:build-tools:${opensearch_version}" - classpath group: 'com.google.guava', name: 'guava', version: '31.1-jre' } } @@ -186,7 +185,7 @@ jacocoTestCoverageVerification { } } } - } + } else { violationRules { rule { @@ -194,7 +193,7 @@ jacocoTestCoverageVerification { minimum = 0.6 } } - } + } } } @@ -373,16 +372,18 @@ dependencies { strictly "2.23.0" } } - testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.0' - testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.0' - testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.0' - testImplementation group: 'org.javassist', name: 'javassist', version: '3.24.0-GA' - testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.0' + testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.2' + testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.2' + testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.2' + testImplementation group: 'org.javassist', name: 'javassist', version: '3.28.0-GA' + testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.2' testImplementation group: 'net.bytebuddy', name: 'byte-buddy', version: '1.9.3' testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.0.1' testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.1' testImplementation group: 'junit', name: 'junit', version: "${junitVersion}" + testImplementation group: 'org.xerial', name: 'sqlite-jdbc', version: '3.41.2.2' + testImplementation group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0' // Required for Docker build dockerBuild group: 'org.opensearch.plugin', name:'performance-analyzer', version: "${opensearch_build}" diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 43f47f7f1..9e2e12380 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -32,8 +32,6 @@ services: opensearch2: container_name: opensearch2 - environment: - - node.cluster_manager=false image: opensearch/pa-rca:3.0 mem_limit: 4g networks: diff --git a/src/main/java/org/opensearch/performanceanalyzer/ESLocalhostConnection.java b/src/main/java/org/opensearch/performanceanalyzer/ESLocalhostConnection.java new file mode 100644 index 000000000..098c3a0b4 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/ESLocalhostConnection.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer; + + +import io.netty.handler.codec.http.HttpMethod; +import java.io.DataOutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ESLocalhostConnection { + + private static final Logger LOG = LogManager.getLogger(ESLocalhostConnection.class); + + private static final int TIMEOUT_MILLIS = 30000; + + public static int makeHttpRequest(String path, HttpMethod httpMethod, String requestBody) { + HttpURLConnection connection = null; + try { + connection = createHTTPConnection(path, httpMethod); + DataOutputStream stream = new DataOutputStream(connection.getOutputStream()); + stream.writeBytes(requestBody); + stream.flush(); + stream.close(); + return connection.getResponseCode(); + } catch (Exception e) { + throw new RuntimeException("Request failed: " + e.getMessage(), e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + public static HttpURLConnection createHTTPConnection(String path, HttpMethod httpMethod) { + try { + String endPoint = "https://localhost:9200" + path; + URL endpointUrl = new URL(endPoint); + HttpURLConnection connection = (HttpURLConnection) endpointUrl.openConnection(); + connection.setRequestMethod(httpMethod.toString()); + + connection.setConnectTimeout(TIMEOUT_MILLIS); + connection.setReadTimeout(TIMEOUT_MILLIS); + connection.setUseCaches(false); + connection.setDoOutput(true); + return connection; + } catch (Exception e) { + throw new RuntimeException( + "Failed to create OpenSearch Connection: " + e.getMessage(), e); + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerApp.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerApp.java index 785b0cdaa..de1590cbe 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerApp.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerApp.java @@ -9,6 +9,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.sun.net.httpserver.HttpServer; +import io.netty.handler.codec.http.HttpMethod; +import java.net.HttpURLConnection; import java.util.*; import java.util.concurrent.*; import org.apache.logging.log4j.LogManager; @@ -51,6 +53,7 @@ public class PerformanceAnalyzerApp { private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerApp.class); + public static final int READER_RESTART_MAX_ATTEMPTS = 3; private static final int EXCEPTION_QUEUE_LENGTH = 1; private static final ScheduledMetricCollectorsExecutor METRIC_COLLECTOR_EXECUTOR = new ScheduledMetricCollectorsExecutor(1, false); @@ -234,13 +237,14 @@ public static Thread startGrpcServerThread( return grpcServerThread; } - private static void startReaderThread( + public static void startReaderThread( final AppContext appContext, final ThreadProvider threadProvider) { PluginSettings settings = PluginSettings.instance(); final Thread readerThread = threadProvider.createThreadForRunnable( () -> { - while (true) { + int retryAttemptLeft = READER_RESTART_MAX_ATTEMPTS; + while (retryAttemptLeft > 0) { try { ReaderMetricsProcessor mp = new ReaderMetricsProcessor( @@ -250,23 +254,71 @@ private static void startReaderThread( ReaderMetricsProcessor.setCurrentInstance(mp); mp.run(); } catch (Throwable e) { - if (TroubleshootingConfig.getEnableDevAssert()) { - break; - } + retryAttemptLeft--; LOG.error( - "Error in ReaderMetricsProcessor...restarting, ExceptionCode: {}", - StatExceptionCode.READER_RESTART_PROCESSING.toString()); + "Error in ReaderMetricsProcessor...restarting, retryCount left: {}." + + "Exception: {}", + retryAttemptLeft, + e.getMessage()); StatsCollector.instance() .logException( StatExceptionCode.READER_RESTART_PROCESSING); + + if (TroubleshootingConfig.getEnableDevAssert()) break; + + // All retry attempts exhausted; handle thread failure + if (retryAttemptLeft <= 0) handleReaderThreadFailed(); } } }, PerformanceAnalyzerThreads.PA_READER); - readerThread.start(); } + private static void handleReaderThreadFailed() { + // Reader subcomponent is responsible for processing, cleaning metrics written by PA. + // Since Reader thread fails to start successfully, execute following: + // + // 1. Disable PA - Stop collecting OpenSearch metrics + // 2. Terminate RCA Process - Gracefully shutdown all + // existing resources/channels, including Reader. + try { + LOG.info( + "Exhausted {} attempts - unable to start Reader Thread successfully; disable PA", + READER_RESTART_MAX_ATTEMPTS); + disablePA(); + LOG.info("Attempt to disable PA succeeded."); + StatsCollector.instance() + .logException(StatExceptionCode.READER_ERROR_PA_DISABLE_SUCCESS); + } catch (Throwable e) { + LOG.info("Attempt to disable PA failing: {}", e.getMessage()); + StatsCollector.instance() + .logException(StatExceptionCode.READER_ERROR_PA_DISABLE_FAILED); + } finally { + cleanupAndExit(); + } + } + + private static void disablePA() { + String PA_CONFIG_PATH = Util.PA_BASE_URL + "/cluster/config"; + String PA_DISABLE_PAYLOAD = "{\"enabled\": false}"; + + int resCode = + ESLocalhostConnection.makeHttpRequest( + PA_CONFIG_PATH, HttpMethod.POST, PA_DISABLE_PAYLOAD); + if (resCode != HttpURLConnection.HTTP_OK) { + throw new RuntimeException("Failed to disable PA"); + } + } + + private static void cleanupAndExit() { + LOG.info("Reader thread not coming up successfully - Shutting down RCA Runtime"); + StatsCollector.instance().logException(StatExceptionCode.READER_ERROR_RCA_AGENT_STOPPED); + + // Terminate Java Runtime, executes {@link #shutDownGracefully(ClientServers clientServers)} + System.exit(1); + } + /** * Start all the servers and clients for request processing. We start two servers: - httpServer: * To handle the curl requests sent to the endpoint. This is human readable and also used by the diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java b/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java index bfc85fcd3..402013cf7 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java @@ -19,11 +19,14 @@ public final class Version { * transferred packets should be dropped. Every increment here should be accompanied with a line * describing the version bump. * - * Note: The RCA version is agnostic of OpenSearch version. + *

Note: The RCA version is agnostic of OpenSearch version. */ static final class Major { - // Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2) - // and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change + // Bumping this post the Commons + // Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2) + // and Service + // Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) + // change static final int RCA_MAJ_VERSION = 1; } diff --git a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerAppTest.java b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerAppTest.java index 3d838b1c7..dc347dcef 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerAppTest.java +++ b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerAppTest.java @@ -5,22 +5,49 @@ package org.opensearch.performanceanalyzer; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.powermock.api.mockito.PowerMockito.doNothing; +import static org.powermock.api.mockito.PowerMockito.doThrow; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.verifyPrivate; +import io.netty.handler.codec.http.HttpMethod; import java.util.concurrent.ArrayBlockingQueue; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.contrib.java.lang.system.ExpectedSystemExit; +import org.junit.runner.RunWith; import org.opensearch.performanceanalyzer.commons.config.ConfigStatus; import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; import org.opensearch.performanceanalyzer.rca.RcaTestHelper; +import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor; import org.opensearch.performanceanalyzer.threads.ThreadProvider; import org.opensearch.performanceanalyzer.threads.exceptions.PAThreadException; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +@RunWith(PowerMockRunner.class) +@PrepareForTest({ESLocalhostConnection.class, PerformanceAnalyzerApp.class}) +@PowerMockIgnore({ + "com.sun.org.apache.xerces.*", + "javax.xml.*", + "org.xml.*", + "javax.management.*", + "org.w3c.*" +}) public class PerformanceAnalyzerAppTest { + @Rule public final ExpectedSystemExit exit = ExpectedSystemExit.none(); @Before public void setup() { System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); + RcaTestHelper.cleanUpLogs(); } @Test @@ -49,4 +76,54 @@ public void testStartErrorHandlingThread() throws InterruptedException { RcaTestHelper.verifyStatException( StatExceptionCode.ERROR_HANDLER_THREAD_STOPPED.toString())); } + + @Test + public void testStartReaderThreadAllAttemptFail() throws Exception { + ThreadProvider threadProvider = new ThreadProvider(); + AppContext appContext = new AppContext(); + + PowerMockito.mockStatic(ESLocalhostConnection.class); + ReaderMetricsProcessor readerMetricsProcessor = mock(ReaderMetricsProcessor.class); + doThrow(new RuntimeException("Force Crashing Reader Thread")) + .when(readerMetricsProcessor) + .run(); + PowerMockito.whenNew(ReaderMetricsProcessor.class) + .withAnyArguments() + .thenReturn(readerMetricsProcessor); + + PowerMockito.spy(PerformanceAnalyzerApp.class); + doNothing().when(PerformanceAnalyzerApp.class, "cleanupAndExit"); + + // PA Disable Success + PowerMockito.when( + ESLocalhostConnection.makeHttpRequest( + anyString(), eq(HttpMethod.POST), anyString())) + .thenReturn(200); + PerformanceAnalyzerApp.startReaderThread(appContext, threadProvider); + Assert.assertTrue( + "READER_RESTART_PROCESSING metric missing", + RcaTestHelper.verifyStatException( + StatExceptionCode.READER_RESTART_PROCESSING.toString())); + Assert.assertTrue( + "READER_ERROR_PA_DISABLE_SUCCESS metric missing", + RcaTestHelper.verifyStatException( + StatExceptionCode.READER_ERROR_PA_DISABLE_SUCCESS.toString())); + verifyPrivate(PerformanceAnalyzerApp.class, times(1)).invoke("cleanupAndExit"); + + // PA Disable Fail + PowerMockito.when( + ESLocalhostConnection.makeHttpRequest( + anyString(), eq(HttpMethod.POST), anyString())) + .thenReturn(500); + PerformanceAnalyzerApp.startReaderThread(appContext, threadProvider); + Assert.assertTrue( + "READER_RESTART_PROCESSING metric missing", + RcaTestHelper.verifyStatException( + StatExceptionCode.READER_RESTART_PROCESSING.toString())); + Assert.assertTrue( + "READER_ERROR_PA_DISABLE_FAILED metric missing", + RcaTestHelper.verifyStatException( + StatExceptionCode.READER_ERROR_PA_DISABLE_FAILED.toString())); + verifyPrivate(PerformanceAnalyzerApp.class, times(2)).invoke("cleanupAndExit"); + } } diff --git a/src/test/resources/rca/batch_metrics_enabled.conf b/src/test/resources/rca/batch_metrics_enabled.conf new file mode 100644 index 000000000..c508d5366 --- /dev/null +++ b/src/test/resources/rca/batch_metrics_enabled.conf @@ -0,0 +1 @@ +false