Skip to content

Commit

Permalink
Addresses the review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Jul 23, 2024
1 parent 34ebcc4 commit 590ba5d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class RTFPerformanceAnalyzerSearchListener

private static final Logger LOG =
LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class);
private static final String OPERATION_SHARD_FETCH = "shard_fetch";
private static final String OPERATION_SHARD_QUERY = "shard_query";
private static final String SHARD_FETCH_PHASE = "shard_fetch";
private static final String SHARD_QUERY_PHASE = "shard_query";
public static final String QUERY_START_TIME = "query_start_time";
public static final String FETCH_START_TIME = "fetch_start_time";
public static final String QUERY_TASK_ID = "query_task_id";
Expand All @@ -63,7 +63,7 @@ private Histogram createCPUUtilizationHistogram(MetricsRegistry metricsRegistry)
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(),
"CPU Utilization per shard for an operation",
"CPU Utilization per shard for a search phase",
RTFMetrics.MetricUnits.RATE.toString());
} else {
LOG.debug("MetricsRegistry is null");
Expand All @@ -75,7 +75,7 @@ private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(),
"Heap used per shard for an operation",
"Heap used per shard for a search phase",
RTFMetrics.MetricUnits.BYTE.toString());
} else {
LOG.debug("MetricsRegistry is null");
Expand Down Expand Up @@ -172,15 +172,15 @@ public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
long queryTime = (System.nanoTime() - queryStartTime);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false);
searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false);
}

@Override
public void failedQueryPhase(SearchContext searchContext) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
long queryTime = (System.nanoTime() - queryStartTime);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true);
searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, true);
}

@Override
Expand All @@ -193,93 +193,85 @@ public void fetchPhase(SearchContext searchContext, long tookInNanos) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime());
long fetchTime = (System.nanoTime() - fetchStartTime);
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, false);
searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false);
}

@Override
public void failedFetchPhase(SearchContext searchContext) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime());
long fetchTime = (System.nanoTime() - fetchStartTime);
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, true);
searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, true);
}

private void addResourceTrackingCompletionListener(
SearchContext searchContext,
long startTime,
long queryTime,
String operation,
String phase,
boolean isFailed) {
addCompletionListener(searchContext, startTime, queryTime, operation, isFailed);
addCompletionListener(searchContext, startTime, queryTime, phase, isFailed);
}

private void addResourceTrackingCompletionListenerForFetchPhase(
SearchContext searchContext,
long fetchStartTime,
long fetchTime,
String operation,
String phase,
boolean isFailed) {
long overallStartTime = fetchStartTime;
long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l);
long startTime = fetchStartTime;
long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, -1l);
/**
* There are scenarios where both query and fetch phases run in the same task for an
* optimization. Adding a special handling for that case to divide the CPU usage between
* these 2 operations by their runTime.
*/
if (queryTaskId == searchContext.getTask().getId()) {
overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
startTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
}
addCompletionListener(searchContext, overallStartTime, fetchTime, operation, isFailed);
addCompletionListener(searchContext, startTime, fetchTime, phase, isFailed);
}

private void addCompletionListener(
SearchContext searchContext,
long overallStartTime,
long operationTime,
String operation,
long startTime,
long phaseTookTime,
String phase,
boolean isFailed) {
searchContext
.getTask()
.addResourceTrackingCompletionListener(
createListener(
searchContext,
overallStartTime,
operationTime,
operation,
isFailed));
createListener(searchContext, startTime, phaseTookTime, phase, isFailed));
}

@VisibleForTesting
NotifyOnceListener<Task> createListener(
SearchContext searchContext,
long overallStartTime,
long totalOperationTime,
String operation,
long startTime,
long phaseTookTime,
String phase,
boolean isFailed) {
return new NotifyOnceListener<Task>() {
@Override
protected void innerOnResponse(Task task) {
LOG.debug("Updating the counter for task {}", task.getId());
/**
* There are scenarios where cpuUsageTime consists of the total of CPU of multiple
* operations. In that case we are computing the cpuShareFactor by dividing the
* particular operationTime and the total time till this calculation happen from the
* phases. In that case we are computing the cpuShareFactor by dividing the
* particular phaseTime and the total time till this calculation happen from the
* overall start time.
*/
long totalTime = System.nanoTime() - overallStartTime;
double operationShareFactor = computeShareFactor(totalOperationTime, totalTime);
long totalTime = System.nanoTime() - startTime;
double shareFactor = computeShareFactor(phaseTookTime, totalTime);
cpuUtilizationHistogram.record(
Utils.calculateCPUUtilization(
numProcessors,
totalTime,
task.getTotalResourceStats().getCpuTimeInNanos(),
operationShareFactor),
shareFactor),
createTags());
heapUsedHistogram.record(
Math.max(
0,
task.getTotalResourceStats().getMemoryInBytes()
* operationShareFactor),
Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor),
createTags());
}

Expand All @@ -294,7 +286,7 @@ private Tags createTags() {
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.request().shardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed);
}

Expand All @@ -306,7 +298,7 @@ protected void innerOnFailure(Exception e) {
}

@VisibleForTesting
static double computeShareFactor(long totalOperationTime, long totalTime) {
return Math.min(1, ((double) totalOperationTime) / Math.max(1.0, totalTime));
static double computeShareFactor(long phaseTookTime, long totalTime) {
return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,19 @@ public String getChannelType() {

@Override
public void sendResponse(TransportResponse response) throws IOException {
emitMetrics(null);
emitMetrics(false);
original.sendResponse(response);
}

@Override
public void sendResponse(Exception exception) throws IOException {
emitMetrics(exception);
emitMetrics(true);
original.sendResponse(exception);
}

private void emitMetrics(Exception exception) {
private void emitMetrics(boolean isFailed) {
double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime);
recordCPUUtilizationMetric(
shardId, cpuUtilization, OPERATION_SHARD_BULK, exception != null);
recordCPUUtilizationMetric(shardId, cpuUtilization, OPERATION_SHARD_BULK, isFailed);
}

private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) {
Expand Down

0 comments on commit 590ba5d

Please sign in to comment.