Skip to content

Commit

Permalink
Handle Reader thread termination gracefully
Browse files Browse the repository at this point in the history
Signed-off-by: Khushboo Rajput <[email protected]>
  • Loading branch information
khushbr committed Aug 14, 2023
1 parent 572e7bc commit 7755177
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 21 deletions.
17 changes: 9 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ buildscript {

dependencies {
classpath "org.opensearch.gradle:build-tools:${opensearch_version}"
classpath group: 'com.google.guava', name: 'guava', version: '31.1-jre'
}
}

Expand Down Expand Up @@ -186,15 +185,15 @@ jacocoTestCoverageVerification {
}
}
}
}
}
else {
violationRules {
rule {
limit {
minimum = 0.6
}
}
}
}
}
}

Expand Down Expand Up @@ -373,16 +372,18 @@ dependencies {
strictly "2.23.0"
}
}
testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.0'
testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.0'
testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.0'
testImplementation group: 'org.javassist', name: 'javassist', version: '3.24.0-GA'
testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.0'
testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.2'
testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.2'
testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.2'
testImplementation group: 'org.javassist', name: 'javassist', version: '3.28.0-GA'
testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.2'
testImplementation group: 'net.bytebuddy', name: 'byte-buddy', version: '1.9.3'
testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.0.1'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.1'
testImplementation group: 'junit', name: 'junit', version: "${junitVersion}"
testImplementation group: 'org.xerial', name: 'sqlite-jdbc', version: '3.41.2.2'
testImplementation group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0'

// Required for Docker build
dockerBuild group: 'org.opensearch.plugin', name:'performance-analyzer', version: "${opensearch_build}"
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ services:

opensearch2:
container_name: opensearch2
environment:
- node.cluster_manager=false
image: opensearch/pa-rca:3.0
mem_limit: 4g
networks:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer;


import io.netty.handler.codec.http.HttpMethod;
import java.io.DataOutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ESLocalhostConnection {

private static final Logger LOG = LogManager.getLogger(ESLocalhostConnection.class);

private static final int TIMEOUT_MILLIS = 30000;

public static int makeHttpRequest(String path, HttpMethod httpMethod, String requestBody) {
HttpURLConnection connection = null;
try {
connection = createHTTPConnection(path, httpMethod);
DataOutputStream stream = new DataOutputStream(connection.getOutputStream());
stream.writeBytes(requestBody);
stream.flush();
stream.close();
return connection.getResponseCode();
} catch (Exception e) {
throw new RuntimeException("Request failed: " + e.getMessage(), e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}

public static HttpURLConnection createHTTPConnection(String path, HttpMethod httpMethod) {
try {
String endPoint = "https://localhost:9200" + path;
URL endpointUrl = new URL(endPoint);
HttpURLConnection connection = (HttpURLConnection) endpointUrl.openConnection();
connection.setRequestMethod(httpMethod.toString());

connection.setConnectTimeout(TIMEOUT_MILLIS);
connection.setReadTimeout(TIMEOUT_MILLIS);
connection.setUseCaches(false);
connection.setDoOutput(true);
return connection;
} catch (Exception e) {
throw new RuntimeException(
"Failed to create OpenSearch Connection: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.net.httpserver.HttpServer;
import io.netty.handler.codec.http.HttpMethod;
import java.net.HttpURLConnection;
import java.util.*;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -51,6 +53,7 @@ public class PerformanceAnalyzerApp {

private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerApp.class);

public static final int READER_RESTART_MAX_ATTEMPTS = 3;
private static final int EXCEPTION_QUEUE_LENGTH = 1;
private static final ScheduledMetricCollectorsExecutor METRIC_COLLECTOR_EXECUTOR =
new ScheduledMetricCollectorsExecutor(1, false);
Expand Down Expand Up @@ -234,13 +237,14 @@ public static Thread startGrpcServerThread(
return grpcServerThread;
}

private static void startReaderThread(
public static void startReaderThread(
final AppContext appContext, final ThreadProvider threadProvider) {
PluginSettings settings = PluginSettings.instance();
final Thread readerThread =
threadProvider.createThreadForRunnable(
() -> {
while (true) {
int retryAttemptLeft = READER_RESTART_MAX_ATTEMPTS;
while (retryAttemptLeft > 0) {
try {
ReaderMetricsProcessor mp =
new ReaderMetricsProcessor(
Expand All @@ -250,23 +254,71 @@ private static void startReaderThread(
ReaderMetricsProcessor.setCurrentInstance(mp);
mp.run();
} catch (Throwable e) {
if (TroubleshootingConfig.getEnableDevAssert()) {
break;
}
retryAttemptLeft--;
LOG.error(
"Error in ReaderMetricsProcessor...restarting, ExceptionCode: {}",
StatExceptionCode.READER_RESTART_PROCESSING.toString());
"Error in ReaderMetricsProcessor...restarting, retryCount left: {}."
+ "Exception: {}",
retryAttemptLeft,
e.getMessage());
StatsCollector.instance()
.logException(
StatExceptionCode.READER_RESTART_PROCESSING);

if (TroubleshootingConfig.getEnableDevAssert()) break;

// All retry attempts exhausted; handle thread failure
if (retryAttemptLeft <= 0) handleReaderThreadFailed();
}
}
},
PerformanceAnalyzerThreads.PA_READER);

readerThread.start();
}

private static void handleReaderThreadFailed() {
// Reader subcomponent is responsible for processing, cleaning metrics written by PA.
// Since Reader thread fails to start successfully, execute following:
//
// 1. Disable PA - Stop collecting OpenSearch metrics
// 2. Terminate RCA Process - Gracefully shutdown all
// existing resources/channels, including Reader.
try {
LOG.info(
"Exhausted {} attempts - unable to start Reader Thread successfully; disable PA",
READER_RESTART_MAX_ATTEMPTS);
disablePA();
LOG.info("Attempt to disable PA succeeded.");
StatsCollector.instance()
.logException(StatExceptionCode.READER_ERROR_PA_DISABLE_SUCCESS);
} catch (Throwable e) {
LOG.info("Attempt to disable PA failing: {}", e.getMessage());
StatsCollector.instance()
.logException(StatExceptionCode.READER_ERROR_PA_DISABLE_FAILED);
} finally {
cleanupAndExit();
}
}

private static void disablePA() {
String PA_CONFIG_PATH = Util.PA_BASE_URL + "/cluster/config";
String PA_DISABLE_PAYLOAD = "{\"enabled\": false}";

int resCode =
ESLocalhostConnection.makeHttpRequest(
PA_CONFIG_PATH, HttpMethod.POST, PA_DISABLE_PAYLOAD);
if (resCode != HttpURLConnection.HTTP_OK) {
throw new RuntimeException("Failed to disable PA");
}
}

private static void cleanupAndExit() {
LOG.info("Reader thread not coming up successfully - Shutting down RCA Runtime");
StatsCollector.instance().logException(StatExceptionCode.READER_ERROR_RCA_AGENT_STOPPED);

// Terminate Java Runtime, executes {@link #shutDownGracefully(ClientServers clientServers)}
System.exit(1);
}

/**
* Start all the servers and clients for request processing. We start two servers: - httpServer:
* To handle the curl requests sent to the endpoint. This is human readable and also used by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ public final class Version {
* transferred packets should be dropped. Every increment here should be accompanied with a line
* describing the version bump.
*
* Note: The RCA version is agnostic of OpenSearch version.
* <p>Note: The RCA version is agnostic of OpenSearch version.
*/
static final class Major {
// Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change
// Bumping this post the Commons
// Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service
// Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8)
// change
static final int RCA_MAJ_VERSION = 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,49 @@

package org.opensearch.performanceanalyzer;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.verifyPrivate;

import io.netty.handler.codec.http.HttpMethod;
import java.util.concurrent.ArrayBlockingQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
import org.junit.runner.RunWith;
import org.opensearch.performanceanalyzer.commons.config.ConfigStatus;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.rca.RcaTestHelper;
import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor;
import org.opensearch.performanceanalyzer.threads.ThreadProvider;
import org.opensearch.performanceanalyzer.threads.exceptions.PAThreadException;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PrepareForTest({ESLocalhostConnection.class, PerformanceAnalyzerApp.class})
@PowerMockIgnore({
"com.sun.org.apache.xerces.*",
"javax.xml.*",
"org.xml.*",
"javax.management.*",
"org.w3c.*"
})
public class PerformanceAnalyzerAppTest {
@Rule public final ExpectedSystemExit exit = ExpectedSystemExit.none();

@Before
public void setup() {
System.setProperty("performanceanalyzer.metrics.log.enabled", "False");
RcaTestHelper.cleanUpLogs();
}

@Test
Expand Down Expand Up @@ -49,4 +76,54 @@ public void testStartErrorHandlingThread() throws InterruptedException {
RcaTestHelper.verifyStatException(
StatExceptionCode.ERROR_HANDLER_THREAD_STOPPED.toString()));
}

@Test
public void testStartReaderThreadAllAttemptFail() throws Exception {
ThreadProvider threadProvider = new ThreadProvider();
AppContext appContext = new AppContext();

PowerMockito.mockStatic(ESLocalhostConnection.class);
ReaderMetricsProcessor readerMetricsProcessor = mock(ReaderMetricsProcessor.class);
doThrow(new RuntimeException("Force Crashing Reader Thread"))
.when(readerMetricsProcessor)
.run();
PowerMockito.whenNew(ReaderMetricsProcessor.class)
.withAnyArguments()
.thenReturn(readerMetricsProcessor);

PowerMockito.spy(PerformanceAnalyzerApp.class);
doNothing().when(PerformanceAnalyzerApp.class, "cleanupAndExit");

// PA Disable Success
PowerMockito.when(
ESLocalhostConnection.makeHttpRequest(
anyString(), eq(HttpMethod.POST), anyString()))
.thenReturn(200);
PerformanceAnalyzerApp.startReaderThread(appContext, threadProvider);
Assert.assertTrue(
"READER_RESTART_PROCESSING metric missing",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_RESTART_PROCESSING.toString()));
Assert.assertTrue(
"READER_ERROR_PA_DISABLE_SUCCESS metric missing",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_ERROR_PA_DISABLE_SUCCESS.toString()));
verifyPrivate(PerformanceAnalyzerApp.class, times(1)).invoke("cleanupAndExit");

// PA Disable Fail
PowerMockito.when(
ESLocalhostConnection.makeHttpRequest(
anyString(), eq(HttpMethod.POST), anyString()))
.thenReturn(500);
PerformanceAnalyzerApp.startReaderThread(appContext, threadProvider);
Assert.assertTrue(
"READER_RESTART_PROCESSING metric missing",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_RESTART_PROCESSING.toString()));
Assert.assertTrue(
"READER_ERROR_PA_DISABLE_FAILED metric missing",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_ERROR_PA_DISABLE_FAILED.toString()));
verifyPrivate(PerformanceAnalyzerApp.class, times(2)).invoke("cleanupAndExit");
}
}
1 change: 1 addition & 0 deletions src/test/resources/rca/batch_metrics_enabled.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
false

0 comments on commit 7755177

Please sign in to comment.