Skip to content

Commit

Permalink
refine logs
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Sep 24, 2024
1 parent 504558c commit a4fe4c6
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@

import io.grpc.Status;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicReference;

/**
* rpc client to send request to pegasus engine service
*/
public class RpcExecutionClient extends ExecutionClient<RpcChannel> {
Logger logger = LoggerFactory.getLogger(RpcExecutionClient.class);
private final Configs graphConfig;
private final AtomicReference<RpcClient> rpcClientRef;

Expand Down Expand Up @@ -93,6 +97,7 @@ public void process(PegasusClient.JobResponse jobResponse) {
@Override
public void finish() {
listener.onCompleted();
logger.info("[compile]: received results from engine");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.proto.frontend.Code;
import com.alibaba.pegasus.common.StreamIterator;
import com.google.common.collect.ImmutableList;

import org.neo4j.fabric.stream.summary.EmptySummary;
import org.neo4j.fabric.stream.summary.Summary;
Expand Down Expand Up @@ -137,7 +136,7 @@ public void onNext(IrResult.Record record) {
public void onCompleted() {
try {
this.recordIterator.finish();
this.statusCallback.onSuccessEnd(ImmutableList.of());
this.statusCallback.onSuccessEnd();
} catch (InterruptedException e) {
onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alibaba.graphscope.gremlin.plugin;

import com.alibaba.graphscope.groot.common.constant.LogConstant;
import com.alibaba.graphscope.groot.common.util.JSON;
import com.google.gson.JsonObject;

import io.opentelemetry.api.common.Attributes;
Expand All @@ -26,8 +25,6 @@

import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.List;

public class QueryStatusCallback {
private final MetricsCollector metricsCollector;
private final QueryLogger queryLogger;
Expand Down Expand Up @@ -65,7 +62,7 @@ public void onErrorEnd(Throwable t, String msg) {
errorMsg = t.getMessage();
}
JsonObject logJson = buildSimpleLog(false, metricsCollector.getElapsedMillis());
fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis(), null);
fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis());
queryLogger.print(logJson.toString(), false, t);

Attributes attrs =
Expand All @@ -81,10 +78,11 @@ public void onErrorEnd(Throwable t, String msg) {
queryLogger.metricsInfo(false, metricsCollector.getElapsedMillis());
}

public void onSuccessEnd(List<Object> results) {
public void onSuccessEnd() {
this.metricsCollector.stop();
queryLogger.info("total execution time is {} ms", metricsCollector.getElapsedMillis());
JsonObject logJson = buildSimpleLog(true, metricsCollector.getElapsedMillis());
fillLogDetail(logJson, null, results);
fillLogDetail(logJson, null);
queryLogger.print(logJson.toString(), true, null);

Attributes attrs =
Expand Down Expand Up @@ -113,24 +111,24 @@ private JsonObject buildSimpleLog(boolean isSucceed, long elapsedMillis) {
return simpleJson;
}

private void fillLogDetail(JsonObject logJson, String errorMsg, List<Object> results) {
private void fillLogDetail(JsonObject logJson, String errorMsg) {
try {
if (this.metricsCollector.getElapsedMillis() > this.printThreshold) {
// todo(siyuan): the invocation of the function can cause Exception when serializing
// a gremlin vertex to json format
fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis(), results);
fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis());
}
} catch (Throwable t) {
queryLogger.warn("fill log detail error", t);
}
}

private void fillLogDetail(
JsonObject logJson, String errorMessage, long startMillis, List<Object> results) {
private void fillLogDetail(JsonObject logJson, String errorMessage, long startMillis) {
logJson.addProperty(LogConstant.QUERY, queryLogger.getQuery());
if (results != null) {
logJson.addProperty(LogConstant.RESULT, JSON.toJson(results));
}
// do not serialize result.
// if (results != null) {
// logJson.addProperty(LogConstant.RESULT, JSON.toJson(results));
// }
if (errorMessage != null) {
logJson.addProperty(LogConstant.ERROR_MESSAGE, errorMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ protected void evalOpInternal(
new MetricsCollector.Gremlin(evalOpTimer),
queryHistogram,
configs);
statusCallback.getQueryLogger().info("[compile]: query received");
QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout());
GremlinExecutor.LifeCycle lifeCycle;
switch (language) {
Expand Down Expand Up @@ -360,6 +361,7 @@ protected GremlinExecutor.LifeCycle createLifeCycle(
if (o != null && o instanceof Traversal) {
applyStrategies((Traversal) o);
}
statusCallback.getQueryLogger().info("[compile]: traversal compiled");
return o;
})
.withResult(
Expand Down Expand Up @@ -404,9 +406,8 @@ protected void processTraversal(
return opCollection;
},
Code.LOGICAL_PLAN_BUILD_FAILED);

queryLogger.info("[compile]: logical IR compiled");
StringBuilder irPlanStr = new StringBuilder();

PegasusClient.JobRequest physicalRequest =
ClassUtils.callException(
() -> {
Expand Down Expand Up @@ -450,7 +451,7 @@ protected void processTraversal(
return request;
},
Code.PHYSICAL_PLAN_BUILD_FAILED);

queryLogger.info("[compile]: physical IR compiled");
Span outgoing;
// if exist up trace, useUpTraceId as current traceId
if (TraceId.isValid(queryLogger.getUpstreamId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void request() {
responseProcessor.process(responseStreamIterator.next());
}
responseProcessor.finish();
statusCallback.getQueryLogger().info("[compile]: process results success");
} catch (Throwable t) {
// if the exception is caused by InterruptedException, it means a timeout exception has
// been thrown by gremlin executor
Expand Down Expand Up @@ -164,7 +165,7 @@ public void process(PegasusClient.JobResponse response) {
}

public void finish() {
statusCallback.onSuccessEnd(resultCollectors);
statusCallback.onSuccessEnd();
aggregateResults();
writeResult.writeAndFlush(
ResponseMessage.build(writeResult.getRequestMessage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.proto.frontend.Code;
import com.alibaba.pegasus.common.StreamIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

Expand Down Expand Up @@ -149,7 +148,7 @@ protected void processRecord(IrResult.Record record) {
}

protected void finishRecord() {
statusCallback.onSuccessEnd(ImmutableList.of());
statusCallback.onSuccessEnd();
List<Object> results = Lists.newArrayList();
if (resultSchema.isGroupBy) {
results.add(reducer);
Expand Down

0 comments on commit a4fe4c6

Please sign in to comment.