Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sql by not flushing underlying stream... y tho #10

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -726,10 +726,7 @@ private Sequence<T> getSimpleServerResults(
Queries.withSpecificSegments(queryPlus.getQuery(), segmentsOfServer)
).withMaxQueuedBytes(maxQueuedBytesPerServer),
responseContext
).withBaggage(() -> {

System.out.println(responseContext.getQueryMetrics().toString());
});
);
}

private Sequence<T> getAndCacheServerResults(
Expand Down
47 changes: 26 additions & 21 deletions server/src/main/java/org/apache/druid/server/QueryResultPusher.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,7 @@ public Response push()
accumulator.flush();

counter.incrementSuccess();
if (queryResponse.getResponseContext().getQueryMetrics() instanceof QueryRuntimeAnalysis) {
response.setTrailers(() -> {
HttpFields fields = new HttpFields();
try {
final QueryRuntimeAnalysis analysis;

analysis = (QueryRuntimeAnalysis) queryResponse.getResponseContext().getQueryMetrics();
// build our own query/time for this guy, the real one happens after the stream is closed
final long queryTimeNs = System.nanoTime() - getStartNs();
analysis.addDiagnosticMeasurement("query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs));
String runtimeAnalysis = jsonMapper.writeValueAsString(analysis);
HttpField runtimeAnalysisField = new HttpField("X-Druid-Query-Runtime-Analysis", runtimeAnalysis);
fields.add(runtimeAnalysisField);
}
catch (JsonProcessingException e) {
log.warn(e, "Unable to serialize query runtime analysis for query [%s]", queryId);
}

return fields;
});
}
setTrailers(queryResponse);
accumulator.close();
resultsWriter.recordSuccess(accumulator.getNumBytesSent());
}
Expand Down Expand Up @@ -232,6 +212,31 @@ public Response push()
return null;
}

protected void setTrailers(QueryResponse<?> queryResponse)
{
if (queryResponse.getResponseContext().getQueryMetrics() instanceof QueryRuntimeAnalysis) {
response.setTrailers(() -> {
HttpFields fields = new HttpFields();
try {
final QueryRuntimeAnalysis analysis;

analysis = (QueryRuntimeAnalysis) queryResponse.getResponseContext().getQueryMetrics();
// build our own query/time for this guy, the real one happens after the stream is closed
final long queryTimeNs = System.nanoTime() - getStartNs();
analysis.addDiagnosticMeasurement("query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs));
String runtimeAnalysis = jsonMapper.writeValueAsString(analysis);
HttpField runtimeAnalysisField = new HttpField("X-Druid-Query-Runtime-Analysis", runtimeAnalysis);
fields.add(runtimeAnalysisField);
}
catch (JsonProcessingException e) {
log.warn(e, "Unable to serialize query runtime analysis for query [%s]", queryId);
}

return fields;
});
}
}

@Nullable
private Response handleQueryException(ResultsWriter resultsWriter, QueryException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public ArrayLinesWriter(final OutputStream outputStream, final ObjectMapper json
this.outputStream = outputStream;
this.jsonGenerator = jsonMapper.writer().getFactory().createGenerator(outputStream);
jsonGenerator.setRootValueSeparator(new SerializedString("\n"));
jsonGenerator.configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public ArrayWriter(final OutputStream outputStream, final ObjectMapper jsonMappe

// Disable automatic JSON termination, so clients can detect truncated responses.
jsonGenerator.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false);
jsonGenerator.configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public ObjectLinesWriter(final OutputStream outputStream, final ObjectMapper jso
this.outputStream = outputStream;
this.jsonGenerator = jsonMapper.writer().getFactory().createGenerator(outputStream);
jsonGenerator.setRootValueSeparator(new SerializedString("\n"));
jsonGenerator.configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public ObjectWriter(final OutputStream outputStream, final ObjectMapper jsonMapp

// Disable automatic JSON termination, so clients can detect truncated responses.
jsonGenerator.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false);
jsonGenerator.configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false);
}

@Override
Expand Down