diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index a636444b..6b7921cc 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -36,8 +36,8 @@ public class RTFPerformanceAnalyzerSearchListener private static final Logger LOG = LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class); - private static final String OPERATION_SHARD_FETCH = "shard_fetch"; - private static final String OPERATION_SHARD_QUERY = "shard_query"; + private static final String SHARD_FETCH_PHASE = "shard_fetch"; + private static final String SHARD_QUERY_PHASE = "shard_query"; public static final String QUERY_START_TIME = "query_start_time"; public static final String FETCH_START_TIME = "fetch_start_time"; public static final String QUERY_TASK_ID = "query_task_id"; @@ -63,7 +63,7 @@ private Histogram createCPUUtilizationHistogram(MetricsRegistry metricsRegistry) if (metricsRegistry != null) { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), - "CPU Utilization per shard for an operation", + "CPU Utilization per shard for a search phase", RTFMetrics.MetricUnits.RATE.toString()); } else { LOG.debug("MetricsRegistry is null"); @@ -75,7 +75,7 @@ private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) { if (metricsRegistry != null) { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(), - "Heap used per shard for an operation", + "Heap used per shard for a search phase", RTFMetrics.MetricUnits.BYTE.toString()); } else { LOG.debug("MetricsRegistry is null"); @@ -172,7 +172,7 @@ public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( - searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false); + searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false); } @Override @@ -180,7 +180,7 @@ public void failedQueryPhase(SearchContext searchContext) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( - searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true); + searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, true); } @Override @@ -193,7 +193,7 @@ public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( - searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, false); + searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false); } @Override @@ -201,60 +201,55 @@ public void failedFetchPhase(SearchContext searchContext) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( - searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, true); + searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, true); } private void addResourceTrackingCompletionListener( SearchContext searchContext, long startTime, long queryTime, - String operation, + String phase, boolean isFailed) { - addCompletionListener(searchContext, startTime, queryTime, operation, isFailed); + addCompletionListener(searchContext, startTime, queryTime, phase, isFailed); } private void addResourceTrackingCompletionListenerForFetchPhase( SearchContext searchContext, long fetchStartTime, long fetchTime, - String operation, + String phase, boolean isFailed) { - long overallStartTime = fetchStartTime; - long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l); + long startTime = fetchStartTime; + long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, -1l); /** * There are scenarios where both query and fetch phases run in the same task for an * optimization. Adding a special handling for that case to divide the CPU usage between * these 2 operations by their runTime. */ if (queryTaskId == searchContext.getTask().getId()) { - overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); + startTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); } - addCompletionListener(searchContext, overallStartTime, fetchTime, operation, isFailed); + addCompletionListener(searchContext, startTime, fetchTime, phase, isFailed); } private void addCompletionListener( SearchContext searchContext, - long overallStartTime, - long operationTime, - String operation, + long startTime, + long phaseTookTime, + String phase, boolean isFailed) { searchContext .getTask() .addResourceTrackingCompletionListener( - createListener( - searchContext, - overallStartTime, - operationTime, - operation, - isFailed)); + createListener(searchContext, startTime, phaseTookTime, phase, isFailed)); } @VisibleForTesting NotifyOnceListener createListener( SearchContext searchContext, - long overallStartTime, - long totalOperationTime, - String operation, + long startTime, + long phaseTookTime, + String phase, boolean isFailed) { return new NotifyOnceListener() { @Override @@ -262,24 +257,21 @@ protected void innerOnResponse(Task task) { LOG.debug("Updating the counter for task {}", task.getId()); /** * There are scenarios where cpuUsageTime consists of the total of CPU of multiple - * operations. In that case we are computing the cpuShareFactor by dividing the - * particular operationTime and the total time till this calculation happen from the + * phases. In that case we are computing the cpuShareFactor by dividing the + * particular phaseTime and the total time till this calculation happen from the * overall start time. */ - long totalTime = System.nanoTime() - overallStartTime; - double operationShareFactor = computeShareFactor(totalOperationTime, totalTime); + long totalTime = System.nanoTime() - startTime; + double shareFactor = computeShareFactor(phaseTookTime, totalTime); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, totalTime, task.getTotalResourceStats().getCpuTimeInNanos(), - operationShareFactor), + shareFactor), createTags()); heapUsedHistogram.record( - Math.max( - 0, - task.getTotalResourceStats().getMemoryInBytes() - * operationShareFactor), + Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor), createTags()); } @@ -294,7 +286,7 @@ private Tags createTags() { .addTag( RTFMetrics.CommonDimension.SHARD_ID.toString(), searchContext.request().shardId().getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); } @@ -306,7 +298,7 @@ protected void innerOnFailure(Exception e) { } @VisibleForTesting - static double computeShareFactor(long totalOperationTime, long totalTime) { - return Math.min(1, ((double) totalOperationTime) / Math.max(1.0, totalTime)); + static double computeShareFactor(long phaseTookTime, long totalTime) { + return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime)); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index 3cc4b353..6eb64185 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -77,20 +77,19 @@ public String getChannelType() { @Override public void sendResponse(TransportResponse response) throws IOException { - emitMetrics(null); + emitMetrics(false); original.sendResponse(response); } @Override public void sendResponse(Exception exception) throws IOException { - emitMetrics(exception); + emitMetrics(true); original.sendResponse(exception); } - private void emitMetrics(Exception exception) { + private void emitMetrics(boolean isFailed) { double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime); - recordCPUUtilizationMetric( - shardId, cpuUtilization, OPERATION_SHARD_BULK, exception != null); + recordCPUUtilizationMetric(shardId, cpuUtilization, OPERATION_SHARD_BULK, isFailed); } private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) {