Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Sep 11, 2023
1 parent dfc2a40 commit 854b673
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private SpannerRpc rpc;
private io.opentelemetry.api.trace.Span openTelemetrySpan =
io.opentelemetry.api.trace.Span.fromContext(Context.current());

private Span span = Tracing.getTracer().getCurrentSpan();
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
Expand Down Expand Up @@ -357,8 +356,7 @@ void initTransaction() {
if (transactionId != null) {
return;
}

OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Creating Transaction", null);
OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Creating Transaction");
span.addAnnotation("Creating Transaction");
try {
TransactionOptions.Builder options = TransactionOptions.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
abstract class AbstractResultSet<R> extends AbstractStructReader implements ResultSet {
private static final io.opentelemetry.api.trace.Tracer openTelemetryTracer =
SpannerOptions.getTracer();

private static final Tracer tracer = Tracing.getTracer();
private static final com.google.protobuf.Value NULL_VALUE =
com.google.protobuf.Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();
Expand Down Expand Up @@ -1248,7 +1247,6 @@ protected PartialResultSet computeNext() {
Attributes.builder()
.put("ResumeToken", resumeToken == null ? "null" : resumeToken.toStringUtf8())
.build());

span.addAnnotation(
"Starting/Resuming stream",
ImmutableMap.of(
Expand Down Expand Up @@ -1300,7 +1298,6 @@ protected PartialResultSet computeNext() {
openTelemetrySpan,
"Stream broken. Safe to retry",
OpenTelemetryTraceUtil.getExceptionAnnotations(spannerException));

span.addAnnotation(
"Stream broken. Safe to retry",
TraceUtil.getExceptionAnnotations(spannerException));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public CommitResponse writeWithOptions(
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
openTelemetrySpan.end();
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
}
}

Expand All @@ -114,7 +114,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
openTelemetrySpan.end();
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
}
}

Expand Down Expand Up @@ -153,7 +153,6 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, READ_ONLY_TRANSACTION);

try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
return getSession().singleUseReadOnlyTransaction();
Expand All @@ -169,7 +168,6 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, READ_ONLY_TRANSACTION);

try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
return getSession().singleUseReadOnlyTransaction(bound);
Expand All @@ -185,7 +183,6 @@ public ReadOnlyTransaction readOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, READ_ONLY_TRANSACTION);

try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
return getSession().readOnlyTransaction();
Expand All @@ -201,7 +198,6 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, READ_ONLY_TRANSACTION);

try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
return getSession().readOnlyTransaction(bound);
Expand All @@ -217,7 +213,6 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, READ_WRITE_TRANSACTION);

try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
return getSession().readWriteTransaction(options);
Expand All @@ -227,7 +222,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
openTelemetrySpan.end();
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
}
}

Expand All @@ -236,7 +231,6 @@ public TransactionManager transactionManager(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, READ_WRITE_TRANSACTION);

try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
return getSession().transactionManager(options);
Expand All @@ -252,7 +246,6 @@ public AsyncRunner runAsync(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, READ_WRITE_TRANSACTION);

try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
return getSession().runAsync(options);
Expand All @@ -268,7 +261,6 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, READ_WRITE_TRANSACTION);

try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
return getSession().transactionManagerAsync(options);
Expand All @@ -284,7 +276,6 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, PARTITION_DML_TRANSACTION);

try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,12 @@ class MetricRegistryConstants {
"The number of sessions released by the user and pool maintainer.";
static final String NUM_SESSIONS_IN_POOL_DESCRIPTION = "The number of sessions in the pool.";

public static final String SPANNER_GFE_LATENCY_NAME = "gfe_latency";

public static final String SPANNER_GFE_LATENCY_DESCRIPTION =
static final String Scope = "cloud.google.com/java/spanner";
static final String SPANNER_GFE_LATENCY_NAME = "gfe_latency";
static final String SPANNER_GFE_LATENCY_DESCRIPTION =
"Latency between Google's network receiving an RPC and reading back the first byte of the response";

public static final String SPANNER_GFE_HEADER_MISSING_COUNT_NAME = "gfe_header_missing_count";

public static final String SPANNER_GFE_HEADER_MISSING_COUNT_DESCRIPTION =
static final String SPANNER_GFE_HEADER_MISSING_COUNT_NAME = "gfe_header_missing_count";
static final String SPANNER_GFE_HEADER_MISSING_COUNT_DESCRIPTION =
"Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network";

public static final String MILLISECOND = "ms";
static final String Scope = "cloud.google.com/java/spanner";
static final String MILLISECOND = "ms";
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
/** Utility methods for tracing. */
class OpenTelemetryTraceUtil {

// static final EndSpanOptions END_SPAN_OPTIONS =
// EndSpanOptions.builder().setSampleToLocalSpanStore(true).build();

static Attributes getTransactionAnnotations(Transaction t) {
return Attributes.builder()
.put("Id", t.getId().toStringUtf8())
Expand All @@ -52,6 +49,10 @@ static Attributes getExceptionAnnotations(SpannerException e) {
.build();
}

static void addEvent(Span span, String eventName) {
span.addEvent(eventName);
}

static void addEvent(Span span, String eventName, Attributes attributes) {
span.addEvent(eventName, attributes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,9 @@ public void run() {
SpannerImpl.tracer
.getCurrentSpan()
.addAnnotation(String.format("Creating %d sessions", sessionCount));

OpenTelemetryTraceUtil.addEvent(
io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()),
String.format("Creating %d sessions", sessionCount),
null);

String.format("Creating %d sessions", sessionCount));
while (remainingSessionsToCreate > 0) {
try {
sessions = internalBatchCreateSessions(remainingSessionsToCreate, channelHint);
Expand All @@ -161,7 +158,7 @@ public void run() {
}
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
openTelemetrySpan.end();
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
}
}
}
Expand Down Expand Up @@ -225,7 +222,6 @@ SessionImpl createSession() {
io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(
SpannerImpl.openTelemetryTracer, SpannerImpl.CREATE_SESSION);

try (Scope s = SpannerImpl.tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
com.google.spanner.v1.Session session =
Expand All @@ -240,11 +236,10 @@ SessionImpl createSession() {
} catch (RuntimeException e) {
TraceUtil.setWithFailure(span, e);
OpenTelemetryTraceUtil.setWithFailure(openTelemetrySpan, e);

throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
openTelemetrySpan.end();
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
}
}

Expand Down Expand Up @@ -313,7 +308,6 @@ void asyncBatchCreateSessions(
private List<SessionImpl> internalBatchCreateSessions(
final int sessionCount, final long channelHint) throws SpannerException {
final Map<SpannerRpc.Option, ?> options = optionMap(SessionOption.channelHint(channelHint));

io.opentelemetry.api.trace.Span openTelemetryParentSpan =
io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current());
io.opentelemetry.api.trace.Span openTelemetrySpan =
Expand All @@ -322,8 +316,7 @@ private List<SessionImpl> internalBatchCreateSessions(
SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST,
openTelemetryParentSpan);
OpenTelemetryTraceUtil.addEvent(
openTelemetrySpan, String.format("Requesting %d sessions", sessionCount), null);

openTelemetrySpan, String.format("Requesting %d sessions", sessionCount));
Span parent = SpannerImpl.tracer.getCurrentSpan();
Span span =
SpannerImpl.tracer
Expand All @@ -349,10 +342,8 @@ private List<SessionImpl> internalBatchCreateSessions(
OpenTelemetryTraceUtil.addEvent(
openTelemetrySpan,
String.format(
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()),
null);
openTelemetrySpan.end();

"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
List<SessionImpl> res = new ArrayList<>(sessionCount);
for (com.google.spanner.v1.Session session : sessions) {
res.add(new SessionImpl(spanner, session.getName(), options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ Span getCurrentSpan() {
return currentSpan;
}

void setopenTelemetryCurrentSpan(io.opentelemetry.api.trace.Span span) {
void setCurrentOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) {
openTelemetryCurrentSpan = span;
}

io.opentelemetry.api.trace.Span getopenTelemetryCurrentSpan() {
io.opentelemetry.api.trace.Span getCurrentOpenTelemetrySpan() {
return openTelemetryCurrentSpan;
}

Expand Down Expand Up @@ -209,7 +209,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
openTelemetrySpan.end();
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
}
}

Expand Down Expand Up @@ -319,7 +319,7 @@ public void close() {
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
openTelemetrySpan.end();
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
}
}

Expand All @@ -342,7 +342,6 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean
final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
final io.opentelemetry.api.trace.Span openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilder(openTelemetryTracer, SpannerImpl.BEGIN_TRANSACTION);

final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(name)
Expand All @@ -360,7 +359,7 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
}
span.end(TraceUtil.END_SPAN_OPTIONS);
openTelemetrySpan.end();
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
res.set(txn.getId());
} catch (ExecutionException e) {
TraceUtil.endSpanWithFailure(span, e);
Expand Down
Loading

0 comments on commit 854b673

Please sign in to comment.