Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports additional query timing types for profiling plugin query components #17146

Open
wants to merge 4 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added new Setting property UnmodifiableOnRestore to prevent updating settings on restore snapshot ([#16957](https://github.com/opensearch-project/OpenSearch/pull/16957))
- Introduce Template query ([#16818](https://github.com/opensearch-project/OpenSearch/pull/16818))
- Propagate the sourceIncludes and excludes fields from fetchSourceContext to FieldsVisitor. ([#17080](https://github.com/opensearch-project/OpenSearch/pull/17080))
- Adds KNN related enums to support additional components for knn query during profiling ([#17146](https://github.com/opensearch-project/OpenSearch/pull/17146))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.core.xcontent.ToXContentObject;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;

/**
* Foundation class for all OpenSearch query builders
Expand Down Expand Up @@ -98,6 +100,10 @@ default QueryBuilder rewrite(QueryRewriteContext queryShardContext) throws IOExc
return this;
}

default Set<String> queryProfilerTimingTypes() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to remove this

return Collections.emptySet();
}

/**
* Recurse through the QueryBuilder tree, visiting any child QueryBuilder.
* @param visitor a query builder visitor to be called by each query builder in the tree.
Expand Down
10 changes: 7 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Constants;
import org.opensearch.Build;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -1386,7 +1387,8 @@ protected Node(
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool),
taskResourceTrackingService,
searchModule.getConcurrentSearchRequestDeciderFactories()
searchModule.getConcurrentSearchRequestDeciderFactories(),
searchModule.getQueryProfilerTimingTypes()
);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
Expand Down Expand Up @@ -2050,7 +2052,8 @@ protected SearchService newSearchService(
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
TaskResourceTrackingService taskResourceTrackingService,
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories,
Map<Class<? extends Query>, Set<String>> profilerTimingsPerQuery
) {
return new SearchService(
clusterService,
Expand All @@ -2064,7 +2067,8 @@ protected SearchService newSearchService(
circuitBreakerService,
indexSearcherExecutor,
taskResourceTrackingService,
concurrentSearchDeciderFactories
concurrentSearchDeciderFactories,
profilerTimingsPerQuery
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -227,6 +228,13 @@ default Optional<ExecutorServiceProvider> getIndexSearcherExecutorProvider() {
return Optional.empty();
}

/**
* Register any additional profiler timing types that is needed across all your queries
*/
default Map<Class<? extends Query>, Set<String>> registerProfilerTimingTypes() {
return emptyMap();
}

/**
* Executor service provider
*/
Expand Down
15 changes: 15 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.search;

import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.opensearch.common.NamedRegistry;
import org.opensearch.common.Nullable;
import org.opensearch.common.geo.GeoShapeType;
Expand Down Expand Up @@ -300,9 +301,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -335,6 +338,7 @@ public class SearchModule {
private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider;

private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
private final Map<Class<? extends Query>, Set<String>> profilerTimingsPerQuery = new HashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that using Lucene Query classes as the keys for the custom timing is a good idea. In particular, there are a number of Lucene queries whose type changes as a result of the rewrite operation. In some cases, the rewritten query is an anonymous subclass.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good point.

The problem is to getting the profile breakdown is completely dependent on Query. To make it context aware and not have unnecessary timing in response, I chose the key as Query.

Since the breakdown is obtained through IndexSearcher there aren't many options if it needs to be context aware to register timings.

any suggestions instead of Query to make it context aware?


/**
* Constructs a new SearchModule object
Expand All @@ -361,12 +365,19 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
registerSearchExts(plugins);
registerShapes();
registerIntervalsSourceProviders();
registerProfilerTimingTypes(plugins);
queryPhaseSearcher = registerQueryPhaseSearcher(plugins);
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
namedWriteables.addAll(SortValue.namedWriteables());
concurrentSearchDeciderFactories = registerConcurrentSearchDeciderFactories(plugins);
}

private void registerProfilerTimingTypes(List<SearchPlugin> plugins) {
for (SearchPlugin plugin : plugins) {
profilerTimingsPerQuery.putAll(plugin.registerProfilerTimingTypes());
}
}

private Collection<ConcurrentSearchRequestDecider.Factory> registerConcurrentSearchDeciderFactories(List<SearchPlugin> plugins) {
List<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories = new ArrayList<>();
for (SearchPlugin plugin : plugins) {
Expand All @@ -383,6 +394,10 @@ public Collection<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchReq
return concurrentSearchDeciderFactories;
}

public Map<Class<? extends Query>, Set<String>> getQueryProfilerTimingTypes() {
return profilerTimingsPerQuery;
}

public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return namedWriteables;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
Expand Down Expand Up @@ -401,6 +402,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final String sessionId = UUIDs.randomBase64UUID();
private final Executor indexSearcherExecutor;
private final TaskResourceTrackingService taskResourceTrackingService;
private final Map<Class<? extends Query>, Set<String>> profilerTimingsPerQuery;

public SearchService(
ClusterService clusterService,
Expand All @@ -414,7 +416,8 @@ public SearchService(
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
TaskResourceTrackingService taskResourceTrackingService,
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories,
Map<Class<? extends Query>, Set<String>> profilerTimingsPerQuery
) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
Expand Down Expand Up @@ -471,6 +474,7 @@ public SearchService(
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField);

this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
this.profilerTimingsPerQuery = profilerTimingsPerQuery;
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -1541,7 +1545,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
}
context.evaluateRequestShouldUseConcurrentSearch();
if (source.profile()) {
context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch()));
context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch(), this.profilerTimingsPerQuery));
}

if (this.indicesService.getCompositeIndexSettings() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ public void setProfiler(QueryProfiler profiler) {
this.profiler = profiler;
}

public QueryProfiler getProfiler() {
return profiler;
}

/**
* Add a {@link Runnable} that will be run on a regular basis while accessing documents in the
* DirectoryReader but also while collecting them and check for query cancellation or timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Base class for a profiling tree.
Expand All @@ -57,13 +59,16 @@ public abstract class AbstractInternalProfileTree<PB extends AbstractProfileBrea
/** A temporary stack used to record where we are in the dependency tree. */
protected Deque<Integer> stack;
private int currentToken = 0;
/** Register of additional custom timings per element **/
private Map<Class<? extends E>, Set<String>> profilerTimingsPerElement;

public AbstractInternalProfileTree() {
public AbstractInternalProfileTree(Map<Class<? extends E>, Set<String>> profilerTimingsPerElement) {
breakdowns = new ArrayList<>(10);
stack = new ArrayDeque<>(10);
tree = new ArrayList<>(10);
elements = new ArrayList<>(10);
roots = new ArrayList<>(10);
this.profilerTimingsPerElement = profilerTimingsPerElement == null ? Collections.emptyMap() : profilerTimingsPerElement;
}

/**
Expand Down Expand Up @@ -128,12 +133,17 @@ private PB addDependencyNode(E element, int token) {
// Save our query for lookup later
elements.add(element);

PB breakdown = createProfileBreakdown();
final Set<String> timings = profilerTimingsPerElement.get(element.getClass());
PB breakdown = createProfileBreakdown(timings == null ? Collections.emptySet() : timings);
breakdowns.add(token, breakdown);
return breakdown;
}

protected abstract PB createProfileBreakdown();
public Map<Class<? extends E>, Set<String>> getProfilerTimingsPerElement() {
return profilerTimingsPerElement;
}

protected abstract PB createProfileBreakdown(Set<String> profilerTimingsPerElement);

/**
* Removes the last (e.g. most recent) value on the stack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@

package org.opensearch.search.profile;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;

Expand All @@ -50,37 +57,41 @@ public abstract class AbstractProfileBreakdown<T extends Enum<T>> {
/**
* The accumulated timings for this query node
*/
protected final Timer[] timings;
protected final T[] timingTypes;
protected final Map<String, Timer> timings;
public static final String TIMING_TYPE_COUNT_SUFFIX = "_count";
public static final String TIMING_TYPE_START_TIME_SUFFIX = "_start_time";

/** Sole constructor. */
public AbstractProfileBreakdown(Class<T> clazz) {
this.timingTypes = clazz.getEnumConstants();
timings = new Timer[timingTypes.length];
for (int i = 0; i < timings.length; ++i) {
timings[i] = new Timer();
}
public AbstractProfileBreakdown(final Class<T> timingType, final Set<String> additionalProfilerTimings) {
Set<String> additionalTimings = additionalProfilerTimings == null ? Collections.emptySet() : additionalProfilerTimings;
timings = Stream.of(Arrays.stream(timingType.getEnumConstants()).map(Enum::name), additionalTimings.stream())
.flatMap(Function.identity())
.filter(Objects::nonNull)
.map(val -> val.toLowerCase(Locale.ROOT))
.collect(Collectors.toUnmodifiableMap(value -> value, value -> new Timer(), (a, b) -> a));
}

public Timer getTimer(T timing) {
return timings[timing.ordinal()];
return timings.get(timing.name().toLowerCase(Locale.ROOT));
}

public Timer getTimer(String timingName) {
return timings.get(timingName.toLowerCase(Locale.ROOT));
}

public void setTimer(T timing, Timer timer) {
timings[timing.ordinal()] = timer;
timings.put(timing.name().toLowerCase(Locale.ROOT), timer);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't timings initialized as an unmodifiableMap on line 71?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, didn't realize I made it unmodifiable. I will make the map modifiable

}

/**
* Build a timing count breakdown for current instance
*/
public Map<String, Long> toBreakdownMap() {
Map<String, Long> map = new HashMap<>(this.timings.length * 3);
for (T timingType : this.timingTypes) {
map.put(timingType.toString(), this.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, this.timings[timingType.ordinal()].getCount());
map.put(timingType + TIMING_TYPE_START_TIME_SUFFIX, this.timings[timingType.ordinal()].getEarliestTimerStartTime());
Map<String, Long> map = new TreeMap<>();
for (String timingType : this.timings.keySet()) {
map.put(timingType, this.timings.get(timingType).getApproximateTiming());
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, this.timings.get(timingType).getCount());
map.put(timingType + TIMING_TYPE_START_TIME_SUFFIX, this.timings.get(timingType).getEarliestTimerStartTime());
Comment on lines +91 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an iteration over timings.entrySet().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change to entrySets instead

}
return Collections.unmodifiableMap(map);
}
Expand All @@ -94,8 +105,8 @@ public Map<String, Object> toDebugMap() {

public long toNodeTime() {
long total = 0;
for (T timingType : timingTypes) {
total += timings[timingType.ordinal()].getApproximateTiming();
for (String timingType : timings.keySet()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be an iteration over timings.entrySet().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change to entrySets instead

total += timings.get(timingType).getApproximateTiming();
}
return total;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Provide contextual profile breakdowns which are associated with freestyle context. Used when concurrent
Expand All @@ -21,8 +22,8 @@
* @opensearch.internal
*/
public abstract class ContextualProfileBreakdown<T extends Enum<T>> extends AbstractProfileBreakdown<T> {
public ContextualProfileBreakdown(Class<T> clazz) {
super(clazz);
public ContextualProfileBreakdown(Class<T> clazz, Set<String> additionalProfilerTimings) {
super(clazz, additionalProfilerTimings);
}

/**
Expand Down
Loading
Loading