Skip to content

Commit

Permalink
Refactor parsing logic for Measurement (opensearch-project#112)
Browse files Browse the repository at this point in the history
* Refactor parsing logic for Measurement

Signed-off-by: Chenyang Ji <[email protected]>

* fix failed tests

Signed-off-by: Chenyang Ji <[email protected]>

---------

Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy authored Sep 10, 2024
1 parent 839e6f7 commit b869de9
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -104,7 +107,9 @@ public List<SearchQueryRecord> read(final String from, final String to) {
try {
SearchResponse searchResponse = client.search(searchRequest).actionGet();
for (SearchHit hit : searchResponse.getHits()) {
SearchQueryRecord record = SearchQueryRecord.getRecord(hit, namedXContentRegistry);
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString());
SearchQueryRecord record = SearchQueryRecord.fromXContent(parser);
records.add(record);
}
} catch (IndexNotFoundException ignored) {} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@

import java.io.IOException;
import java.util.Objects;
import org.opensearch.core.common.ParsingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

/**
* Measurement that is stored in the SearchQueryRecord. Measurement can be of a specific AggregationType
*/
public class Measurement implements ToXContentObject, Writeable {
private static int DEFAULT_COUNT = 1;

private static final String NUMBER = "number";
private static final String COUNT = "count";
private static final String AGGREGATION_TYPE = "aggregationType";

private AggregationType aggregationType;
private Number number;
private int count;
Expand Down Expand Up @@ -55,6 +62,21 @@ public Measurement(Number number) {
this(number, DEFAULT_COUNT, AggregationType.DEFAULT_AGGREGATION_TYPE);
}

private Measurement() {}

/**
* Construct a measurement from {@link XContentParser}
*
* @param parser {@link XContentParser}
* @return {@link Measurement}
* @throws IOException IOException
*/
public static Measurement fromXContent(XContentParser parser) throws IOException {
Measurement builder = new Measurement();
builder.parseXContent(parser);
return builder;
}

/**
* Add measurement number to the current number based on the aggregationType.
* If aggregateType is NONE, replace the number since we are not aggregating in this case.
Expand Down Expand Up @@ -150,13 +172,45 @@ public void setAggregationType(AggregationType aggregationType) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field("number", number);
builder.field("count", count);
builder.field("aggregationType", aggregationType.toString());
builder.field(NUMBER, number);
builder.field(COUNT, count);
builder.field(AGGREGATION_TYPE, aggregationType.toString());
builder.endObject();
return builder;
}

/**
* Parse a measurement from {@link XContentParser}
*
* @param parser {@link XContentParser}
* @throws IOException IOException
*/
private void parseXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new ParsingException(
parser.getTokenLocation(),
"Expected [" + XContentParser.Token.START_OBJECT + "] but found [" + token + "]",
parser.getTokenLocation()
);
} else {
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (NUMBER.equals(currentFieldName)) {
this.number = parser.numberValue();
} else if (COUNT.equals(currentFieldName)) {
this.count = parser.intValue();
} else if (AGGREGATION_TYPE.equals(currentFieldName)) {
this.aggregationType = AggregationType.valueOf(parser.text());
}
}
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeNumber(out, number);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;

Expand Down Expand Up @@ -141,18 +137,17 @@ public SearchQueryRecord(final long timestamp, Map<MetricType, Measurement> meas
}

/**
* Returns a SearchQueryRecord from a SearchHit
* Construct a SearchQueryRecord from {@link XContentParser}
*
* @param hit SearchHit to parse into SearchQueryRecord
* @param namedXContentRegistry NamedXContentRegistry for parsing purposes
* @return SearchQueryRecord
* @param parser {@link XContentParser}
* @return {@link SearchQueryRecord}
* @throws IOException IOException
*/
public static SearchQueryRecord getRecord(SearchHit hit, NamedXContentRegistry namedXContentRegistry) throws IOException {
public static SearchQueryRecord fromXContent(XContentParser parser) throws IOException {
long timestamp = 0L;
Map<MetricType, Measurement> measurements = new HashMap<>();
Map<Attribute, Object> attributes = new HashMap<>();
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString());

parser.nextToken();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -167,7 +162,7 @@ public static SearchQueryRecord getRecord(SearchHit hit, NamedXContentRegistry n
case CPU:
case MEMORY:
MetricType metric = MetricType.fromString(fieldName);
measurements.put(metric, new Measurement(metric.parseValue(parser.longValue())));
measurements.put(metric, Measurement.fromXContent(parser));
break;
case SEARCH_TYPE:
attributes.put(Attribute.SEARCH_TYPE, parser.text());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,19 @@ public static SearchQueryRecord createFixedSearchQueryRecord() {
Map<MetricType, Measurement> measurements = Map.of(MetricType.LATENCY, new Measurement(1L));

Map<String, Long> phaseLatencyMap = new HashMap<>();
phaseLatencyMap.put("expand", 1L);
phaseLatencyMap.put("query", 10L);
phaseLatencyMap.put("fetch", 1L);
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap);
attributes.put(
Attribute.TASK_RESOURCE_USAGES,
List.of(
new TaskResourceInfo("action", 2L, 1L, "id", new TaskResourceUsage(1000L, 2000L)),
new TaskResourceInfo("action2", 3L, 1L, "id2", new TaskResourceUsage(2000L, 1000L))
)
);

return new SearchQueryRecord(timestamp, measurements, attributes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testSerialize() throws Exception {

public void testToXContent() throws IOException {
char[] expectedXcontent =
"{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"search_type\":\"query_then_fetch\",\"measurements\":{\"latency\":{\"number\":1,\"count\":1,\"aggregationType\":\"NONE\"}}}]}"
"{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"phase_latency_map\":{\"expand\":1,\"query\":10,\"fetch\":1},\"task_resource_usages\":[{\"action\":\"action\",\"taskId\":2,\"parentTaskId\":1,\"nodeId\":\"id\",\"taskResourceUsage\":{\"cpu_time_in_nanos\":1000,\"memory_in_bytes\":2000}},{\"action\":\"action2\",\"taskId\":3,\"parentTaskId\":1,\"nodeId\":\"id2\",\"taskResourceUsage\":{\"cpu_time_in_nanos\":2000,\"memory_in_bytes\":1000}}],\"search_type\":\"query_then_fetch\",\"measurements\":{\"latency\":{\"number\":1,\"count\":1,\"aggregationType\":\"NONE\"}}}]}"
.toCharArray();
TopQueries topQueries = QueryInsightsTestUtils.createFixedTopQueries();
ClusterName clusterName = new ClusterName("test-cluster");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import java.util.List;
import java.util.Set;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
import org.opensearch.test.OpenSearchTestCase;

Expand Down Expand Up @@ -54,6 +56,16 @@ public void testEqual() {
assertEquals(record1, record2);
}

public void testFromXContent() {
SearchQueryRecord record = QueryInsightsTestUtils.createFixedSearchQueryRecord();
try (XContentParser recordParser = createParser(JsonXContent.jsonXContent, record.toString())) {
SearchQueryRecord parsedRecord = SearchQueryRecord.fromXContent(recordParser);
QueryInsightsTestUtils.checkRecordsEquals(List.of(record), List.of(parsedRecord));
} catch (Exception e) {
fail("Test should not throw exceptions when parsing search query record");
}
}

/**
* Serialize and deserialize a SearchQueryRecord.
* @param record A SearchQueryRecord to serialize.
Expand Down

0 comments on commit b869de9

Please sign in to comment.