diff --git a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 58e835f..5e91c42 100644 --- a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -79,7 +79,8 @@ public Collection createComponents( clusterService.getClusterSettings(), threadPool, client, - metricsRegistry + metricsRegistry, + xContentRegistry ); return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService, false)); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java new file mode 100644 index 0000000..b6db551 --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.reader; + +import java.util.ArrayList; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; + +/** + * Local index reader for reading query insights data from local OpenSearch indices. + */ +public final class LocalIndexReader implements QueryInsightsReader { + /** + * Logger of the local index reader + */ + private final Logger logger = LogManager.getLogger(); + private final Client client; + private DateTimeFormatter indexPattern; + private final NamedXContentRegistry namedXContentRegistry; + + /** + * Constructor of LocalIndexReader + * + * @param client OS client + * @param indexPattern the pattern of index to read from + * @param namedXContentRegistry for parsing purposes + */ + public LocalIndexReader(final Client client, final DateTimeFormatter indexPattern, final NamedXContentRegistry namedXContentRegistry) { + this.indexPattern = indexPattern; + this.client = client; + this.namedXContentRegistry = namedXContentRegistry; + } + + /** + * Getter of indexPattern + * + * @return indexPattern + */ + public DateTimeFormatter getIndexPattern() { + return indexPattern; + } + + /** + * Setter of indexPattern + * + * @param indexPattern index pattern + * @return the current LocalIndexReader + */ + public LocalIndexReader setIndexPattern(DateTimeFormatter indexPattern) { + this.indexPattern = indexPattern; + return this; + } + + /** + * Export a list of SearchQueryRecord from local index + * + * @param from start timestamp + * @param to end timestamp + * @return list of SearchQueryRecords whose timestamps fall between from and to + */ + @Override + public List read(final String from, final String to) { + List records = new ArrayList<>(); + if (from == null || to == null) { + return records; + } + final DateTime start = DateTime.parse(from); + DateTime end = DateTime.parse(to); + if (end.compareTo(DateTime.now(DateTimeZone.UTC)) > 0) { + end = DateTime.now(DateTimeZone.UTC); + } + DateTime curr = start; + while (curr.compareTo(end.plusDays(1).withTimeAtStartOfDay()) < 0) { + String index = getDateTimeFromFormat(curr); + SearchRequest searchRequest = new SearchRequest(index); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*"); + RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp").from(start.getMillis()).to(end.getMillis()); + QueryBuilder query = QueryBuilders.boolQuery().must(rangeQuery).mustNot(excludeQuery); + searchSourceBuilder.query(query); + searchRequest.source(searchSourceBuilder); + try { + SearchResponse searchResponse = client.search(searchRequest).actionGet(); + for (SearchHit hit : searchResponse.getHits()) { + SearchQueryRecord record = SearchQueryRecord.getRecord(hit, namedXContentRegistry); + records.add(record); + } + } catch (IndexNotFoundException ignored) {} catch (Exception e) { + logger.error("Unable to parse search hit: ", e); + } + curr = curr.plusDays(1); + + } + return records; + } + + /** + * Close the reader sink + */ + @Override + public void close() { + logger.debug("Closing the LocalIndexReader.."); + } + + private String getDateTimeFromFormat(DateTime current) { + return indexPattern.print(current); + } +} diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java new file mode 100644 index 0000000..0b98b18 --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.reader; + +import java.io.Closeable; +import java.util.List; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +/** + * Base interface for Query Insights readers + */ +public interface QueryInsightsReader extends Closeable { + /** + * Reader a list of SearchQueryRecord + * + * @param from string + * @param to string + * @return List of SearchQueryRecord + */ + List read(final String from, final String to); +} diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java new file mode 100644 index 0000000..32cbf3b --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.reader; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.joda.time.format.DateTimeFormat; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.NamedXContentRegistry; + +/** + * Factory class for validating and creating Readers based on provided settings + */ +public class QueryInsightsReaderFactory { + /** + * Logger of the query insights Reader factory + */ + private final Logger logger = LogManager.getLogger(); + final private Client client; + final private Set Readers; + + /** + * Constructor of QueryInsightsReaderFactory + * + * @param client OS client + */ + public QueryInsightsReaderFactory(final Client client) { + this.client = client; + this.Readers = new HashSet<>(); + } + + /** + * Validate Reader sink config + * + * @param settings Reader sink config {@link Settings} + * @throws IllegalArgumentException if provided Reader sink config settings are invalid + */ + public void validateReaderConfig(final Settings settings) throws IllegalArgumentException { + final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); + if (indexPattern.isEmpty()) { + throw new IllegalArgumentException("Empty index pattern configured for the Reader"); + } + try { + DateTimeFormat.forPattern(indexPattern); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the Reader", indexPattern) + ); + } + } + + /** + * Create a Reader based on provided parameters + * + * @param indexPattern the index pattern if creating an index Reader + * @param namedXContentRegistry for parsing purposes + * @return QueryInsightsReader the created Reader + */ + public QueryInsightsReader createReader(String indexPattern, NamedXContentRegistry namedXContentRegistry) { + QueryInsightsReader Reader = new LocalIndexReader(client, DateTimeFormat.forPattern(indexPattern), namedXContentRegistry); + this.Readers.add(Reader); + return Reader; + } + + /** + * Update a Reader based on provided parameters + * + * @param Reader The Reader to update + * @param indexPattern the index pattern if creating an index Reader + * @return QueryInsightsReader the updated Reader sink + */ + public QueryInsightsReader updateReader(QueryInsightsReader Reader, String indexPattern) { + if (Reader.getClass() == LocalIndexReader.class) { + ((LocalIndexReader) Reader).setIndexPattern(DateTimeFormat.forPattern(indexPattern)); + } + return Reader; + } + + /** + * Close a Reader + * + * @param Reader the Reader to close + */ + public void closeReader(QueryInsightsReader Reader) throws IOException { + if (Reader != null) { + Reader.close(); + this.Readers.remove(Reader); + } + } + + /** + * Close all Readers + * + */ + public void closeAllReaders() { + for (QueryInsightsReader Reader : Readers) { + try { + closeReader(Reader); + } catch (IOException e) { + logger.error("Fail to close query insights Reader, error: ", e); + } + } + } +} diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/package-info.java b/src/main/java/org/opensearch/plugin/insights/core/reader/package-info.java new file mode 100644 index 0000000..1efad9a --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Query Insights reader + */ +package org.opensearch.plugin.insights.core.reader; diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 2b41856..2055719 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -26,7 +26,9 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; +import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory; import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer; import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; @@ -75,6 +77,11 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ final QueryInsightsExporterFactory queryInsightsExporterFactory; + /** + * Query Insights reader factory + */ + final QueryInsightsReaderFactory queryInsightsReaderFactory; + /** * Flags for enabling insight data grouping for different metric types */ @@ -84,6 +91,8 @@ public class QueryInsightsService extends AbstractLifecycleComponent { private SearchQueryCategorizer searchQueryCategorizer; + private NamedXContentRegistry namedXContentRegistry; + /** * Constructor of the QueryInsightsService * @@ -91,29 +100,36 @@ public class QueryInsightsService extends AbstractLifecycleComponent { * @param threadPool The OpenSearch thread pool to run async tasks * @param client OS client * @param metricsRegistry Opentelemetry Metrics registry + * @param namedXContentRegistry NamedXContentRegistry for parsing purposes */ @Inject public QueryInsightsService( final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client, - final MetricsRegistry metricsRegistry + final MetricsRegistry metricsRegistry, + final NamedXContentRegistry namedXContentRegistry ) { enableCollect = new HashMap<>(); queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); this.threadPool = threadPool; this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client); + this.queryInsightsReaderFactory = new QueryInsightsReaderFactory(client); + this.namedXContentRegistry = namedXContentRegistry; // initialize top n queries services and configurations consumers topQueriesServices = new HashMap<>(); for (MetricType metricType : MetricType.allMetricTypes()) { enableCollect.put(metricType, false); - topQueriesServices.put(metricType, new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory)); + topQueriesServices.put( + metricType, + new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory) + ); } for (MetricType type : MetricType.allMetricTypes()) { clusterSettings.addSettingsUpdateConsumer( getExporterSettings(type), - (settings -> setExporter(type, settings)), - (settings -> validateExporterConfig(type, settings)) + (settings -> setExporterAndReader(type, settings)), + (settings -> validateExporterAndReaderConfig(type, settings)) ); } @@ -361,14 +377,16 @@ public void setTopNSize(final MetricType type, final int topNSize) { } /** - * Set the exporter config for a metricType + * Set the exporter and reader config for a metricType * * @param type {@link MetricType} - * @param settings exporter settings + * @param settings exporter and reader settings */ - public void setExporter(final MetricType type, final Settings settings) { + public void setExporterAndReader(final MetricType type, final Settings settings) { if (topQueriesServices.containsKey(type)) { - topQueriesServices.get(type).setExporter(settings); + TopQueriesService tqs = topQueriesServices.get(type); + tqs.setExporter(settings); + tqs.setReader(settings, namedXContentRegistry); } } @@ -381,14 +399,15 @@ public SearchQueryCategorizer getSearchQueryCategorizer() { } /** - * Validate the exporter config for a metricType + * Validate the exporter and reader config for a metricType * * @param type {@link MetricType} - * @param settings exporter settings + * @param settings exporter and reader settings */ - public void validateExporterConfig(final MetricType type, final Settings settings) { + public void validateExporterAndReaderConfig(final MetricType type, final Settings settings) { if (topQueriesServices.containsKey(type)) { - topQueriesServices.get(type).validateExporterConfig(settings); + TopQueriesService tqs = topQueriesServices.get(type); + tqs.validateExporterAndReaderConfig(settings); } } @@ -418,5 +437,6 @@ protected void doClose() throws IOException { } // close any unclosed resources queryInsightsExporterFactory.closeAllExporters(); + queryInsightsReaderFactory.closeAllReaders(); } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index 9bd2dd0..af3b6a8 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -21,23 +21,31 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.joda.time.DateTime; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.core.exporter.SinkType; +import org.opensearch.plugin.insights.core.reader.QueryInsightsReader; +import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory; import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper; import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper; import org.opensearch.plugin.insights.rules.model.AggregationType; +import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; @@ -87,6 +95,11 @@ public class TopQueriesService { */ private final QueryInsightsExporterFactory queryInsightsExporterFactory; + /** + * Factory for validating and creating readers + */ + private final QueryInsightsReaderFactory queryInsightsReaderFactory; + /** * The internal OpenSearch thread pool that execute async processing and exporting tasks */ @@ -96,22 +109,26 @@ public class TopQueriesService { * Exporter for exporting top queries data */ private QueryInsightsExporter exporter; + private QueryInsightsReader reader; private QueryGrouper queryGrouper; TopQueriesService( final MetricType metricType, final ThreadPool threadPool, - final QueryInsightsExporterFactory queryInsightsExporterFactory + final QueryInsightsExporterFactory queryInsightsExporterFactory, + QueryInsightsReaderFactory queryInsightsReaderFactory ) { this.enabled = false; this.metricType = metricType; this.threadPool = threadPool; this.queryInsightsExporterFactory = queryInsightsExporterFactory; + this.queryInsightsReaderFactory = queryInsightsReaderFactory; this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; this.windowStart = -1L; this.exporter = null; + this.reader = null; topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); @@ -266,14 +283,48 @@ public void setExporter(final Settings settings) { } /** - * Validate provided settings for top queries exporter + * Set up the top queries reader based on provided settings + * + * @param settings reader config {@link Settings} + * @param namedXContentRegistry NamedXContentRegistry for parsing purposes + */ + public void setReader(final Settings settings, final NamedXContentRegistry namedXContentRegistry) { + this.reader = queryInsightsReaderFactory.createReader( + settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN), + namedXContentRegistry + ); + queryInsightsReaderFactory.updateReader(reader, settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN)); + } + + /** + * Validate provided settings for top queries exporter and reader * - * @param settings settings exporter config {@link Settings} + * @param settings settings exporter/reader config {@link Settings} */ - public void validateExporterConfig(Settings settings) { + public void validateExporterAndReaderConfig(Settings settings) { queryInsightsExporterFactory.validateExporterConfig(settings); + queryInsightsReaderFactory.validateReaderConfig(settings); } + /** + * Lambda function to mark if a record is internal + */ + private final Predicate checkIfInternal = (record) -> { + Map attributes = record.getAttributes(); + Object indicesObject = attributes.get(Attribute.INDICES); + if (indicesObject instanceof Object[]) { + Object[] indices = (Object[]) indicesObject; + return Arrays.stream(indices).noneMatch(index -> { + if (index instanceof String) { + String indexString = (String) index; + return indexString.contains("top_queries"); + } + return false; + }); + } + return true; + }; + /** * Get all top queries records that are in the current top n queries store * Optionally include top N records from the last window. @@ -281,10 +332,13 @@ public void validateExporterConfig(Settings settings) { * By default, return the records in sorted order. * * @param includeLastWindow if the top N queries from the last window should be included + * @param from start timestamp + * @param to end timestamp * @return List of the records that are in the query insight store - * @throws IllegalArgumentException if query insight is disabled in the cluster + * @throws IllegalArgumentException if query insights is disabled in the cluster */ - public List getTopQueriesRecords(final boolean includeLastWindow) throws IllegalArgumentException { + public List getTopQueriesRecords(final boolean includeLastWindow, final String from, final String to) + throws IllegalArgumentException { if (!enabled) { throw new IllegalArgumentException( String.format(Locale.ROOT, "Cannot get top n queries for [%s] when it is not enabled.", metricType.toString()) @@ -295,6 +349,53 @@ public List getTopQueriesRecords(final boolean includeLastWin if (includeLastWindow) { queries.addAll(topQueriesHistorySnapshot.get()); } + List filterQueries = queries; + if (from != null && to != null) { + final DateTime start = DateTime.parse(from); + final DateTime end = DateTime.parse(to); + Predicate timeFilter = element -> start.getMillis() <= element.getTimestamp() + && element.getTimestamp() <= end.getMillis(); + filterQueries = queries.stream().filter(checkIfInternal.and(timeFilter)).collect(Collectors.toList()); + } + return Stream.of(filterQueries) + .flatMap(Collection::stream) + .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) + .collect(Collectors.toList()); + } + + /** + * Get all historical top queries records that are in local index + *

+ * By default, return the records in sorted order. + * + * @param from start timestamp + * @param to end timestamp + * @return List of the records that are in local index (if enabled) with timestamps between from and to + * @throws IllegalArgumentException if query insights is disabled in the cluster + */ + public List getTopQueriesRecordsFromIndex(final String from, final String to) throws IllegalArgumentException { + if (!enabled) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Cannot get top n queries for [%s] when it is not enabled.", metricType.toString()) + ); + } + + final List queries = new ArrayList<>(); + if (reader != null) { + try { + final DateTime start = DateTime.parse(from); + final DateTime end = DateTime.parse(to); + List records = reader.read(from, to); + Predicate timeFilter = element -> start.getMillis() <= element.getTimestamp() + && element.getTimestamp() <= end.getMillis(); + List filteredRecords = records.stream() + .filter(checkIfInternal.and(timeFilter)) + .collect(Collectors.toList()); + queries.addAll(filteredRecords); + } catch (Exception e) { + logger.error("Failed to read from index: ", e); + } + } return Stream.of(queries) .flatMap(Collection::stream) .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) @@ -404,6 +505,7 @@ public List getTopQueriesCurrentSnapshot() { */ public void close() throws IOException { queryInsightsExporterFactory.closeExporter(this.exporter); + queryInsightsReaderFactory.closeReader(this.reader); } /** diff --git a/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java b/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java index a18e7f2..28ea870 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java @@ -20,6 +20,8 @@ public class TopQueriesRequest extends BaseNodesRequest { final MetricType metricType; + final String from; + final String to; /** * Constructor for TopQueriesRequest @@ -30,6 +32,8 @@ public class TopQueriesRequest extends BaseNodesRequest { public TopQueriesRequest(final StreamInput in) throws IOException { super(in); this.metricType = MetricType.readFromStream(in); + this.from = null; + this.to = null; } /** @@ -37,11 +41,15 @@ public TopQueriesRequest(final StreamInput in) throws IOException { * If none are passed, cluster level top queries will be returned. * * @param metricType {@link MetricType} + * @param from start timestamp + * @param to end timestamp * @param nodesIds the nodeIds specified in the request */ - public TopQueriesRequest(final MetricType metricType, final String... nodesIds) { + public TopQueriesRequest(final MetricType metricType, final String from, final String to, final String... nodesIds) { super(nodesIds); this.metricType = metricType; + this.from = from; + this.to = to; } /** @@ -52,6 +60,22 @@ public MetricType getMetricType() { return metricType; } + /** + * Get from for timestamp request + * @return String of fromtimestamp + */ + public String getFrom() { + return from; + } + + /** + * Get to for timestamp request + * @return String of to timestamp + */ + public String getTo() { + return to; + } + @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index 42ee029..a81bbc8 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -9,29 +9,92 @@ package org.opensearch.plugin.insights.rules.model; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.Version; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.tasks.Task; /** * SearchQueryRecord represents a minimal atomic record stored in the Query Insight Framework, * which contains extensive information related to a search query. */ public class SearchQueryRecord implements ToXContentObject, Writeable { - public static final String MEASUREMENTS = "measurements"; + private static final Logger log = LogManager.getLogger(SearchQueryRecord.class); private final long timestamp; private final Map measurements; private final Map attributes; + /** + * Timestamp + */ + public static final String TIMESTAMP = "timestamp"; + /** + * Latency time + */ + public static final String LATENCY = "latency"; + /** + * CPU usage + */ + public static final String CPU = "cpu"; + /** + * Memory usage + */ + public static final String MEMORY = "memory"; + /** + * The search query type + */ + public static final String SEARCH_TYPE = "search_type"; + /** + * The search query source + */ + public static final String SOURCE = "source"; + /** + * Total shards queried + */ + public static final String TOTAL_SHARDS = "total_shards"; + /** + * The indices involved + */ + public static final String INDICES = "indices"; + /** + * The per phase level latency map for a search query + */ + public static final String PHASE_LATENCY_MAP = "phase_latency_map"; + /** + * The node id for this request + */ + public static final String NODE_ID = "node_id"; + /** + * Tasks level resource usages in this request + */ + public static final String TASK_RESOURCE_USAGES = "task_resource_usages"; + /** + * Custom search request labels + */ + public static final String LABELS = "labels"; + + public static final String MEASUREMENTS = "measurements"; private String groupingId; /** @@ -77,6 +140,123 @@ public SearchQueryRecord(final long timestamp, Map meas this.timestamp = timestamp; } + /** + * Returns a SearchQueryRecord from a SearchHit + * + * @param hit SearchHit to parse into SearchQueryRecord + * @param namedXContentRegistry NamedXContentRegistry for parsing purposes + * @return SearchQueryRecord + */ + public static SearchQueryRecord getRecord(SearchHit hit, NamedXContentRegistry namedXContentRegistry) throws IOException { + long timestamp = 0L; + Map measurements = new HashMap<>(); + Map attributes = new HashMap<>(); + XContentParser parser = XContentType.JSON.xContent() + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString()); + parser.nextToken(); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + try { + switch (fieldName) { + case TIMESTAMP: + timestamp = parser.longValue(); + break; + case LATENCY: + case CPU: + case MEMORY: + MetricType metric = MetricType.fromString(fieldName); + measurements.put(metric, new Measurement(metric.parseValue(parser.longValue()))); + break; + case SEARCH_TYPE: + attributes.put(Attribute.SEARCH_TYPE, parser.text()); + break; + case SOURCE: + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + attributes.put(Attribute.SOURCE, SearchSourceBuilder.fromXContent(parser, false)); + break; + case TOTAL_SHARDS: + attributes.put(Attribute.TOTAL_SHARDS, parser.intValue()); + break; + case INDICES: + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); + List indices = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + indices.add(parser.text()); + } + attributes.put(Attribute.INDICES, indices.toArray()); + break; + case PHASE_LATENCY_MAP: + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + Map phaseLatencyMap = new HashMap<>(); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String phase = parser.currentName(); + parser.nextToken(); + phaseLatencyMap.put(phase, parser.longValue()); + } + attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + break; + case NODE_ID: + attributes.put(Attribute.NODE_ID, parser.text()); + break; + case TASK_RESOURCE_USAGES: + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); + List tasksResourceUsages = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + String action = ""; + long taskId = 0L; + long parentTaskId = 0L; + String nodeId = ""; + TaskResourceUsage taskRU = new TaskResourceUsage(0L, 0L); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String usageFields = parser.currentName(); + parser.nextToken(); + switch (usageFields) { + case "action": + action = parser.text(); + break; + case "taskId": + taskId = parser.longValue(); + break; + case "parentTaskId": + parentTaskId = parser.longValue(); + break; + case "nodeId": + nodeId = parser.text(); + break; + case "taskResourceUsage": + taskRU = TaskResourceUsage.fromXContent(parser); + break; + default: + break; + } + } + TaskResourceInfo resourceInfo = new TaskResourceInfo(action, taskId, parentTaskId, nodeId, taskRU); + tasksResourceUsages.add(resourceInfo); + } + attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages); + break; + case LABELS: + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + Map labels = new HashMap<>(); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + parser.nextToken(); + labels.put(Task.X_OPAQUE_ID, parser.text()); + } + attributes.put(Attribute.LABELS, labels); + break; + default: + break; + } + } catch (Exception e) { + log.error("Error when parsing through search hit", e); + } + } + return new SearchQueryRecord(timestamp, measurements, attributes); + } + /** * Returns the observation time of the metric. * @@ -144,7 +324,7 @@ public void addAttribute(final Attribute attribute, final Object value) { } @Override - public XContentBuilder toXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException { + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); builder.field("timestamp", timestamp); for (Map.Entry entry : attributes.entrySet()) { diff --git a/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java b/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java index 883af7e..270dd5f 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java @@ -15,6 +15,7 @@ import java.util.Locale; import java.util.Set; import java.util.stream.Collectors; +import org.joda.time.DateTime; import org.opensearch.client.node.NodeClient; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.Strings; @@ -64,15 +65,54 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC return channel -> client.execute(TopQueriesAction.INSTANCE, topQueriesRequest, topQueriesResponse(channel)); } + private static boolean isNotISODate(final String dateTime) { + try { + DateTime.parse(dateTime); + return false; + } catch (Exception e) { + return true; + } + } + static TopQueriesRequest prepareRequest(final RestRequest request) { final String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); final String metricType = request.param("type", MetricType.LATENCY.toString()); + final String from = request.param("from", null); + final String to = request.param("to", null); if (!ALLOWED_METRICS.contains(metricType)) { throw new IllegalArgumentException( String.format(Locale.ROOT, "request [%s] contains invalid metric type [%s]", request.path(), metricType) ); } - return new TopQueriesRequest(MetricType.fromString(metricType), nodesIds); + if (from != null || to != null) { + if (from != null ^ to != null) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "request [%s] is missing one of the time parameters. Both must be provided", request.path()) + ); + } + if (isNotISODate(from)) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "request [%s] contains invalid 'from' date format. Expected ISO8601 format string (YYYY-MM-DD'T'HH:mm:ss.SSSZ): [%s]", + request.path(), + from + ) + ); + } + if (isNotISODate(to)) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "request [%s] contains invalid 'to' date format. Expected ISO8601 format string (YYYY-MM-DD'T'HH:mm:ss.SSSZ): [%s]", + request.path(), + to + ) + ); + } + } + + return new TopQueriesRequest(MetricType.fromString(metricType), from, to, nodesIds); } @Override diff --git a/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java b/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java index b46bdc0..f2ed2a7 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java @@ -86,6 +86,16 @@ protected TopQueriesResponse newResponse( default: size = clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); } + final String from = topQueriesRequest.getFrom(); + final String to = topQueriesRequest.getTo(); + if (from != null && to != null) { + responses.add( + new TopQueries( + clusterService.localNode(), + queryInsightsService.getTopQueriesService(topQueriesRequest.getMetricType()).getTopQueriesRecordsFromIndex(from, to) + ) + ); + } return new TopQueriesResponse(clusterService.getClusterName(), responses, failures, size, topQueriesRequest.getMetricType()); } @@ -102,9 +112,11 @@ protected TopQueries newNodeResponse(final StreamInput in) throws IOException { @Override protected TopQueries nodeOperation(final NodeRequest nodeRequest) { final TopQueriesRequest request = nodeRequest.request; + final String from = request.getFrom(); + final String to = request.getTo(); return new TopQueries( clusterService.localNode(), - queryInsightsService.getTopQueriesService(request.getMetricType()).getTopQueriesRecords(true) + queryInsightsService.getTopQueriesService(request.getMetricType()).getTopQueriesRecords(true, from, to) ); } diff --git a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 89290bc..d38f81d 100644 --- a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -206,7 +206,6 @@ public class QueryInsightsSettings { * Config key for export index */ public static final String EXPORT_INDEX = "config.index"; - /** * Settings and defaults for top queries exporters */ diff --git a/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java b/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java new file mode 100644 index 0000000..f6f2848 --- /dev/null +++ b/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.reader; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.search.TotalHits; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.junit.Before; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.document.DocumentField; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.test.OpenSearchTestCase; + +/** + * Granular tests for the {@link LocalIndexReaderTests} class. + */ +public class LocalIndexReaderTests extends OpenSearchTestCase { + private final DateTimeFormatter format = DateTimeFormat.forPattern("YYYY.MM.dd"); + private final Client client = mock(Client.class); + private final NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); + private LocalIndexReader localIndexReader; + + @Before + public void setup() { + localIndexReader = new LocalIndexReader(client, format, namedXContentRegistry); + } + + @SuppressWarnings("unchecked") + public void testReadRecords() { + ActionFuture responseActionFuture = mock(ActionFuture.class); + Map sourceMap = new HashMap<>(); + sourceMap.put("timestamp", DateTime.now(DateTimeZone.UTC).getMillis()); + sourceMap.put("indices", Collections.singletonList("my-index-0")); + sourceMap.put("source", Map.of()); + sourceMap.put("labels", Map.of()); + sourceMap.put("cpu", 10000); + sourceMap.put("memory", 20000); + sourceMap.put("latency", 3); + + BytesReference sourceRef; + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.map(sourceMap); + sourceRef = BytesReference.bytes(builder); + } catch (IOException e) { + throw new RuntimeException("Failed to build XContent", e); + } + + SearchHit hit = new SearchHit( + 1, + "id1", + Collections.singletonMap("_source", new DocumentField("_source", List.of(sourceMap))), + new HashMap<>() + ); + hit.sourceRef(sourceRef); + SearchHits searchHits = new SearchHits(new SearchHit[] { hit }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(searchHits); + when(responseActionFuture.actionGet()).thenReturn(searchResponse); + when(client.search(any(SearchRequest.class))).thenReturn(responseActionFuture); + String time = DateTime.now(DateTimeZone.UTC).toString(); + List records = List.of(); + try { + records = localIndexReader.read(time, time); + } catch (Exception e) { + fail("No exception should be thrown when reading query insights data"); + } + assertNotNull(records); + assertEquals(1, records.size()); + } + + public void testClose() { + try { + localIndexReader.close(); + } catch (Exception e) { + fail("No exception should be thrown when closing local index reader"); + } + } + + public void testGetAndSetIndexPattern() { + DateTimeFormatter newFormatter = mock(DateTimeFormatter.class); + localIndexReader.setIndexPattern(newFormatter); + assert (localIndexReader.getIndexPattern() == newFormatter); + } +} diff --git a/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java b/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java new file mode 100644 index 0000000..89e13f1 --- /dev/null +++ b/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.reader; + +import static org.mockito.Mockito.mock; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX; + +import org.joda.time.format.DateTimeFormat; +import org.junit.Before; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.test.OpenSearchTestCase; + +/** + * Granular tests for the {@link QueryInsightsReaderFactoryTests} class. + */ +public class QueryInsightsReaderFactoryTests extends OpenSearchTestCase { + private final String format = DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; + + private final Client client = mock(Client.class); + private final NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); + private QueryInsightsReaderFactory queryInsightsReaderFactory; + + @Before + public void setup() { + queryInsightsReaderFactory = new QueryInsightsReaderFactory(client); + } + + public void testValidateConfigWhenResetReader() { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + try { + queryInsightsReaderFactory.validateReaderConfig(settings); + } catch (Exception e) { + fail("No exception should be thrown when setting is null"); + } + } + + public void testInvalidReaderTypeConfig() { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.put(EXPORT_INDEX, "some_invalid_type").build(); + assertThrows(IllegalArgumentException.class, () -> { queryInsightsReaderFactory.validateReaderConfig(settings); }); + } + + public void testCreateAndCloseReader() { + QueryInsightsReader reader1 = queryInsightsReaderFactory.createReader(format, namedXContentRegistry); + assertTrue(reader1 instanceof LocalIndexReader); + try { + queryInsightsReaderFactory.closeReader(reader1); + queryInsightsReaderFactory.closeAllReaders(); + } catch (Exception e) { + fail("No exception should be thrown when closing reader"); + } + } + + public void testUpdateReader() { + LocalIndexReader reader = new LocalIndexReader(client, DateTimeFormat.forPattern(format), namedXContentRegistry); + queryInsightsReaderFactory.updateReader(reader, "yyyy-MM-dd-HH"); + assertEquals(DateTimeFormat.forPattern("yyyy-MM-dd-HH"), reader.getIndexPattern()); + } + +} diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index e0fb47d..cff48d6 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -16,6 +16,7 @@ import org.opensearch.client.Client; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; @@ -31,6 +32,7 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase { private final ThreadPool threadPool = mock(ThreadPool.class); private final Client client = mock(Client.class); + private final NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); private QueryInsightsService queryInsightsService; private QueryInsightsService queryInsightsServiceSpy; @@ -40,7 +42,13 @@ public void setup() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); - queryInsightsService = new QueryInsightsService(clusterSettings, threadPool, client, NoopMetricsRegistry.INSTANCE); + queryInsightsService = new QueryInsightsService( + clusterSettings, + threadPool, + client, + NoopMetricsRegistry.INSTANCE, + namedXContentRegistry + ); queryInsightsService.enableCollection(MetricType.LATENCY, true); queryInsightsService.enableCollection(MetricType.CPU, true); queryInsightsService.enableCollection(MetricType.MEMORY, true); @@ -57,7 +65,7 @@ public void testAddRecordToLimitAndDrain() { queryInsightsService.drainRecords(); assertEquals( QueryInsightsSettings.DEFAULT_TOP_N_SIZE, - queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size() + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false, null, null).size() ); } @@ -112,7 +120,7 @@ public void testAddRecordGroupBySimilarityWithDifferentGroups() { assertEquals( QueryInsightsSettings.DEFAULT_TOP_N_SIZE, - queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size() + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false, null, null).size() ); } @@ -135,7 +143,7 @@ public void testAddRecordGroupBySimilarityWithOneGroup() { assertTrue(queryInsightsService.addRecord(records.get(numberOfRecordsRequired - 1))); queryInsightsService.drainRecords(); - assertEquals(1, queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size()); + assertEquals(1, queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false, null, null).size()); } public void testAddRecordGroupBySimilarityWithTwoGroups() { @@ -154,6 +162,6 @@ public void testAddRecordGroupBySimilarityWithTwoGroups() { } queryInsightsService.drainRecords(); - assertEquals(2, queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size()); + assertEquals(2, queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false, null, null).size()); } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index d69fe53..58854f9 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -17,6 +17,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; +import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory; import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; @@ -31,10 +32,11 @@ public class TopQueriesServiceTests extends OpenSearchTestCase { private TopQueriesService topQueriesService; private final ThreadPool threadPool = mock(ThreadPool.class); private final QueryInsightsExporterFactory queryInsightsExporterFactory = mock(QueryInsightsExporterFactory.class); + private final QueryInsightsReaderFactory queryInsightsReaderFactory = mock(QueryInsightsReaderFactory.class); @Before public void setup() { - topQueriesService = new TopQueriesService(MetricType.LATENCY, threadPool, queryInsightsExporterFactory); + topQueriesService = new TopQueriesService(MetricType.LATENCY, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory); topQueriesService.setTopNSize(Integer.MAX_VALUE); topQueriesService.setWindowSize(new TimeValue(Long.MAX_VALUE)); topQueriesService.setEnabled(true); @@ -45,7 +47,7 @@ public void testIngestQueryDataWithLargeWindow() { topQueriesService.consumeRecords(records); assertTrue( QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder( - topQueriesService.getTopQueriesRecords(false), + topQueriesService.getTopQueriesRecords(false, null, null), records, MetricType.LATENCY ) @@ -58,20 +60,20 @@ public void testRollingWindows() { records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 5, System.currentTimeMillis() - 1000 * 60 * 10, 0); topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); topQueriesService.consumeRecords(records); - assertEquals(0, topQueriesService.getTopQueriesRecords(true).size()); + assertEquals(0, topQueriesService.getTopQueriesRecords(true, null, null).size()); // Create 10 records at now + 1 minute, to make sure they belong to the current window records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, System.currentTimeMillis() + 1000 * 60, 0); topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); topQueriesService.consumeRecords(records); - assertEquals(10, topQueriesService.getTopQueriesRecords(true).size()); + assertEquals(10, topQueriesService.getTopQueriesRecords(true, null, null).size()); } public void testSmallNSize() { final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); topQueriesService.setTopNSize(1); topQueriesService.consumeRecords(records); - assertEquals(1, topQueriesService.getTopQueriesRecords(false).size()); + assertEquals(1, topQueriesService.getTopQueriesRecords(false, null, null).size()); } public void testValidateTopNSize() { @@ -84,7 +86,7 @@ public void testValidateNegativeTopNSize() { public void testGetTopQueriesWhenNotEnabled() { topQueriesService.setEnabled(false); - assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false); }); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false, null, null); }); } public void testValidateWindowSize() { @@ -117,13 +119,13 @@ public void testRollingWindowsWithSameGroup() { records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 5, System.currentTimeMillis() - 1000 * 60 * 10, 0); topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); topQueriesService.consumeRecords(records); - assertEquals(0, topQueriesService.getTopQueriesRecords(true).size()); + assertEquals(0, topQueriesService.getTopQueriesRecords(true, null, null).size()); // Create 10 records at now + 1 minute, to make sure they belong to the current window records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, System.currentTimeMillis() + 1000 * 60, 0); topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); topQueriesService.consumeRecords(records); - assertEquals(10, topQueriesService.getTopQueriesRecords(true).size()); + assertEquals(10, topQueriesService.getTopQueriesRecords(true, null, null).size()); } public void testRollingWindowsWithDifferentGroup() { @@ -135,13 +137,13 @@ public void testRollingWindowsWithDifferentGroup() { topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); topQueriesService.consumeRecords(records); - assertEquals(0, topQueriesService.getTopQueriesRecords(true).size()); + assertEquals(0, topQueriesService.getTopQueriesRecords(true, null, null).size()); // Create 10 records at now + 1 minute, to make sure they belong to the current window records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, System.currentTimeMillis() + 1000 * 60, 0); QueryInsightsTestUtils.populateSameQueryHashcodes(records); topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); topQueriesService.consumeRecords(records); - assertEquals(1, topQueriesService.getTopQueriesRecords(true).size()); + assertEquals(1, topQueriesService.getTopQueriesRecords(true, null, null).size()); } } diff --git a/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java b/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java index 619fd4b..8769332 100644 --- a/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java +++ b/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java @@ -22,7 +22,7 @@ public class TopQueriesRequestTests extends OpenSearchTestCase { * Check that we can set the metric type */ public void testSetMetricType() throws Exception { - TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY, randomAlphaOfLength(5)); + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY, null, null, randomAlphaOfLength(5)); TopQueriesRequest deserializedRequest = roundTripRequest(request); assertEquals(request.getMetricType(), deserializedRequest.getMetricType()); } diff --git a/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java b/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java index b0055da..4028d4f 100644 --- a/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java +++ b/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java @@ -14,6 +14,8 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestRequest; @@ -25,6 +27,8 @@ public class RestTopQueriesActionTests extends OpenSearchTestCase { public void testEmptyNodeIdsValidType() { Map params = new HashMap<>(); params.put("type", randomFrom(ALLOWED_METRICS)); + params.put("from", DateTime.now(DateTimeZone.UTC).toString()); + params.put("to", DateTime.now(DateTimeZone.UTC).toString()); RestRequest restRequest = buildRestRequest(params); TopQueriesRequest actual = RestTopQueriesAction.prepareRequest(restRequest); assertEquals(0, actual.nodesIds().length); @@ -53,6 +57,57 @@ public void testInValidType() { ); } + public void testInValidFrom() { + Map params = new HashMap<>(); + params.put("from", "not valid timestamp"); + params.put("to", DateTime.now(DateTimeZone.UTC).toString()); + RestRequest restRequest = buildRestRequest(params); + Exception exception = assertThrows(IllegalArgumentException.class, () -> { RestTopQueriesAction.prepareRequest(restRequest); }); + assertEquals( + String.format( + Locale.ROOT, + "request [/_insights/top_queries] contains invalid 'from' date format. Expected ISO8601 format string (YYYY-MM-DD'T'HH:mm:ss.SSSZ): [%s]", + params.get("from") + ), + exception.getMessage() + ); + } + + public void testInValidTo() { + Map params = new HashMap<>(); + params.put("from", DateTime.now(DateTimeZone.UTC).toString()); + params.put("to", "not valid timestamp"); + RestRequest restRequest = buildRestRequest(params); + Exception exception = assertThrows(IllegalArgumentException.class, () -> { RestTopQueriesAction.prepareRequest(restRequest); }); + assertEquals( + String.format( + Locale.ROOT, + "request [/_insights/top_queries] contains invalid 'to' date format. Expected ISO8601 format string (YYYY-MM-DD'T'HH:mm:ss.SSSZ): [%s]", + params.get("to") + ), + exception.getMessage() + ); + } + + public void testMissingOneTimeParam() { + Map params = new HashMap<>(); + params.put("from", DateTime.now(DateTimeZone.UTC).toString()); + RestRequest restRequest = buildRestRequest(params); + Exception exception = assertThrows(IllegalArgumentException.class, () -> { RestTopQueriesAction.prepareRequest(restRequest); }); + assertEquals( + "request [/_insights/top_queries] is missing one of the time parameters. Both must be provided", + exception.getMessage() + ); + Map params2 = new HashMap<>(); + params2.put("to", DateTime.now(DateTimeZone.UTC).toString()); + RestRequest restRequest2 = buildRestRequest(params2); + Exception exception2 = assertThrows(IllegalArgumentException.class, () -> { RestTopQueriesAction.prepareRequest(restRequest2); }); + assertEquals( + "request [/_insights/top_queries] is missing one of the time parameters. Both must be provided", + exception2.getMessage() + ); + } + public void testGetRoutes() { RestTopQueriesAction action = new RestTopQueriesAction(); List routes = action.routes(); diff --git a/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java b/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java index 50b5b98..75a3c80 100644 --- a/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java +++ b/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java @@ -64,7 +64,7 @@ public DummyParentAction( } public TopQueriesResponse createNewResponse() { - TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY, null, null); return newResponse(request, List.of(), List.of()); } }