From 050b164c1c81900315f971404fa77a4aa3258d7e Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Thu, 21 Sep 2023 00:18:24 +0530 Subject: [PATCH] Refactor --- google-cloud-spanner/pom.xml | 52 +++--- .../cloud/spanner/AbstractReadContext.java | 52 +++--- .../cloud/spanner/AbstractResultSet.java | 76 ++------ .../spanner/AsyncTransactionManagerImpl.java | 20 +- .../google/cloud/spanner/BatchClientImpl.java | 17 +- .../cloud/spanner/DatabaseClientImpl.java | 173 +++++++----------- .../com/google/cloud/spanner/ErrorCode.java | 3 +- .../cloud/spanner/OpenTelemetryTraceUtil.java | 95 ---------- .../spanner/PartitionedDmlTransaction.java | 13 +- .../google/cloud/spanner/SessionClient.java | 74 ++------ .../com/google/cloud/spanner/SessionImpl.java | 84 +++------ .../com/google/cloud/spanner/SessionPool.java | 94 ++++------ .../com/google/cloud/spanner/SpannerImpl.java | 4 +- .../com/google/cloud/spanner/TraceUtil.java | 82 --------- .../cloud/spanner/TransactionManagerImpl.java | 37 ++-- .../cloud/spanner/TransactionRunnerImpl.java | 148 ++++----------- .../cloud/spanner/tracing/DualScope.java | 40 ++++ .../cloud/spanner/tracing/DualSpan.java | 134 ++++++++++++++ .../google/cloud/spanner/tracing/IScope.java | 22 +++ .../google/cloud/spanner/tracing/ISpan.java | 36 ++++ .../cloud/spanner/tracing/TraceWrapper.java | 80 ++++++++ .../AsyncTransactionManagerImplTest.java | 4 +- .../cloud/spanner/OpenTelemetrySpanTest.java | 41 ++--- .../spanner/ResumableStreamIteratorTest.java | 15 +- .../google/cloud/spanner/SessionImplTest.java | 5 +- .../google/cloud/spanner/SessionPoolTest.java | 9 +- .../spanner/TransactionManagerImplTest.java | 4 +- .../spanner/TransactionRunnerImplTest.java | 16 +- 28 files changed, 651 insertions(+), 779 deletions(-) delete mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryTraceUtil.java delete mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/DualScope.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/DualSpan.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/IScope.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/ISpan.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/TraceWrapper.java diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index 718b67d965a..7480f589568 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -16,7 +16,7 @@ google-cloud-spanner 0.31.1 - 1.28.0 + 1.30.1 22.3.3 com.google.cloud.spanner.GceTestEnvConfig projects/gcloud-devel/instances/spanner-testing-east1 @@ -249,30 +249,6 @@ opentelemetry-context ${opentelemetry.version} - - io.opentelemetry - opentelemetry-sdk - ${opentelemetry.version} - test - - - io.opentelemetry - opentelemetry-sdk-metrics - ${opentelemetry.version} - test - - - io.opentelemetry - opentelemetry-sdk-trace - ${opentelemetry.version} - test - - - io.opentelemetry - opentelemetry-sdk-testing - ${opentelemetry.version} - test - com.google.auth google-auth-library-oauth2-http @@ -434,6 +410,32 @@ test + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-sdk-metrics + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-sdk-trace + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-sdk-testing + ${opentelemetry.version} + test + + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index b2887b91495..0627c7daa98 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -37,7 +37,10 @@ import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.DualSpan; +import com.google.cloud.spanner.tracing.ISpan; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.spanner.v1.BeginTransactionRequest; @@ -69,9 +72,10 @@ abstract class AbstractReadContext abstract static class Builder, T extends AbstractReadContext> { private SessionImpl session; private SpannerRpc rpc; - private io.opentelemetry.api.trace.Span openTelemetrySpan = - io.opentelemetry.api.trace.Span.fromContext(Context.current()); - private Span span = Tracing.getTracer().getCurrentSpan(); + private ISpan span = + new DualSpan( + Tracing.getTracer().getCurrentSpan(), + io.opentelemetry.api.trace.Span.fromContext(Context.current())); private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS; private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS; private ExecutorProvider executorProvider; @@ -93,16 +97,11 @@ B setRpc(SpannerRpc rpc) { return self(); } - B setSpan(Span span) { + B setSpan(ISpan span) { this.span = span; return self(); } - B setOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) { - this.openTelemetrySpan = span; - return self(); - } - B setDefaultPrefetchChunks(int defaultPrefetchChunks) { this.defaultPrefetchChunks = defaultPrefetchChunks; return self(); @@ -356,7 +355,6 @@ void initTransaction() { if (transactionId != null) { return; } - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Creating Transaction"); span.addAnnotation("Creating Transaction"); try { TransactionOptions.Builder options = TransactionOptions.newBuilder(); @@ -383,16 +381,16 @@ void initTransaction() { ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e); } transactionId = transaction.getId(); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Transaction Creation Done", - OpenTelemetryTraceUtil.getTransactionAnnotations(transaction)); span.addAnnotation( - "Transaction Creation Done", TraceUtil.getTransactionAnnotations(transaction)); + "Transaction Creation Done", + ImmutableMap.of( + "id", + transaction.getId().toStringUtf8(), + "Timestamp", + Timestamp.fromProto(transaction.getReadTimestamp()).toString())); + } catch (SpannerException e) { - OpenTelemetryTraceUtil.addEventWithException( - openTelemetrySpan, "Transaction Creation Failed", e); - span.addAnnotation("Transaction Creation Failed", TraceUtil.getExceptionAnnotations(e)); + span.addAnnotation("Transaction Creation Failed", e); throw e; } } @@ -403,8 +401,7 @@ void initTransaction() { final SessionImpl session; final SpannerRpc rpc; final ExecutorProvider executorProvider; - Span span; - io.opentelemetry.api.trace.Span openTelemetrySpan; + ISpan span; private final int defaultPrefetchChunks; private final QueryOptions defaultQueryOptions; @@ -431,19 +428,19 @@ void initTransaction() { this.defaultPrefetchChunks = builder.defaultPrefetchChunks; this.defaultQueryOptions = builder.defaultQueryOptions; this.span = builder.span; - this.openTelemetrySpan = builder.openTelemetrySpan; this.executorProvider = builder.executorProvider; } @Override - public void setSpan(Span span) { + public void setSpan(ISpan span) { this.span = span; } + /** + * No-op method needed to implement SessionTransaction interface. + */ @Override - public void setOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) { - this.openTelemetrySpan = span; - } + public void setSpan(Span span) {} long getSeqNo() { return seqNo.incrementAndGet(); @@ -690,7 +687,6 @@ ResultSet executeQueryInternalWithOptions( MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span, - openTelemetrySpan, rpc.getExecuteQueryRetrySettings(), rpc.getExecuteQueryRetryableCodes()) { @Override @@ -750,8 +746,7 @@ public final void invalidate() { @Override public void close() { - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); synchronized (lock) { isClosed = true; } @@ -831,7 +826,6 @@ ResultSet readInternalWithOptions( MAX_BUFFERED_CHUNKS, SpannerImpl.READ, span, - openTelemetrySpan, rpc.getReadRetrySettings(), rpc.getReadRetryableCodes()) { @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index b00ae672ad9..212191bce8c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -33,11 +33,13 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Type.StructField; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.IScope; +import com.google.cloud.spanner.tracing.ISpan; +import com.google.cloud.spanner.tracing.TraceWrapper; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; @@ -50,12 +52,7 @@ import com.google.spanner.v1.Transaction; import com.google.spanner.v1.TypeCode; import io.grpc.Context; -import io.opencensus.common.Scope; -import io.opencensus.trace.AttributeValue; -import io.opencensus.trace.Span; -import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; -import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.io.Serializable; import java.math.BigDecimal; @@ -83,7 +80,7 @@ /** Implementation of {@link ResultSet}. */ abstract class AbstractResultSet extends AbstractStructReader implements ResultSet { - private static final Tracer tracer = Tracing.getTracer(); + private static final TraceWrapper tracer = new TraceWrapper(Tracing.getTracer()); private static final com.google.protobuf.Value NULL_VALUE = com.google.protobuf.Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build(); @@ -1094,8 +1091,7 @@ abstract static class ResumableStreamIterator extends AbstractIterator buffer = new LinkedList<>(); private final int maxBufferSize; - private final Span span; - private final io.opentelemetry.api.trace.Span openTelemetrySpan; + private final ISpan span; private CloseableIterator stream; private ByteString resumeToken; private boolean finished; @@ -1109,16 +1105,12 @@ abstract static class ResumableStreamIterator extends AbstractIterator retryableCodes) { checkArgument(maxBufferSize >= 0); this.maxBufferSize = maxBufferSize; - this.openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilderWithExplicitParent( - SpannerOptions.getTracer(), streamName, openTelemetryParent); - this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan(); + this.span = tracer.spanBuilderWithExplicitParent(streamName, parent); this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings); this.retryableCodes = Preconditions.checkNotNull(retryableCodes); this.backOff = newBackOff(); @@ -1174,15 +1166,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException { } private void backoffSleep(Context context, long backoffMillis) throws SpannerException { - tracer - .getCurrentSpan() - .addAnnotation( - "Backing off", - ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis))); - OpenTelemetryTraceUtil.addEvent( - io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()), - "Backing off", - Attributes.builder().put("Delay", backoffMillis).build()); + tracer.getCurrentSpan().addAnnotation("Backing off", "Delay", backoffMillis); final CountDownLatch latch = new CountDownLatch(1); final Context.CancellationListener listener = ignored -> { @@ -1222,8 +1206,7 @@ public void execute(Runnable command) { public void close(@Nullable String message) { if (stream != null) { stream.close(message); - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); stream = null; } } @@ -1239,20 +1222,12 @@ protected PartialResultSet computeNext() { while (true) { // Eagerly start stream before consuming any buffered items. if (stream == null) { - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Starting/Resuming stream", - Attributes.builder() - .put("ResumeToken", resumeToken == null ? "null" : resumeToken.toStringUtf8()) - .build()); span.addAnnotation( "Starting/Resuming stream", - ImmutableMap.of( - "ResumeToken", - AttributeValue.stringAttributeValue( - resumeToken == null ? "null" : resumeToken.toStringUtf8()))); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + "ResumeToken", + resumeToken == null ? "null" : resumeToken.toStringUtf8()); + + try (IScope sss = tracer.withSpan(span)) { // When start a new stream set the Span as current to make the gRPC Span a child of // this Span. stream = checkNotNull(startStream(resumeToken)); @@ -1292,13 +1267,8 @@ protected PartialResultSet computeNext() { } } catch (SpannerException spannerException) { if (safeToRetry && isRetryable(spannerException)) { - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Stream broken. Safe to retry", - OpenTelemetryTraceUtil.getExceptionAnnotations(spannerException)); - span.addAnnotation( - "Stream broken. Safe to retry", - TraceUtil.getExceptionAnnotations(spannerException)); + + span.addAnnotation("Stream broken. Safe to retry", spannerException); logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException); // Truncate any items in the buffer before the last retry token. while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) { @@ -1306,8 +1276,7 @@ protected PartialResultSet computeNext() { } assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken); stream = null; - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + try (IScope s = tracer.withSpan(span)) { long delay = spannerException.getRetryDelayInMillis(); if (delay != -1) { backoffSleep(context, delay); @@ -1318,17 +1287,12 @@ protected PartialResultSet computeNext() { continue; } - span.addAnnotation("Stream broken. Not safe to retry"); - TraceUtil.setWithFailure(span, spannerException); - OpenTelemetryTraceUtil.addEventWithExceptionAndSetFailure( - openTelemetrySpan, "Stream broken. Safe to retry", spannerException); + span.addAnnotation("Stream broken. Not safe to retry", spannerException); + span.setStatus(spannerException); throw spannerException; } catch (RuntimeException e) { - span.addAnnotation("Stream broken. Not safe to retry"); - TraceUtil.setWithFailure(span, e); - OpenTelemetryTraceUtil.addEventWithExceptionAndSetFailure( - openTelemetrySpan, "Stream broken. Safe to retry", e); - + span.addAnnotation("Stream broken. Not safe to retry", e); + span.setStatus(e); throw e; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index f07f9c0858d..83194c91c73 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -25,6 +25,7 @@ import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager; import com.google.cloud.spanner.TransactionManager.TransactionState; +import com.google.cloud.spanner.tracing.ISpan; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; @@ -35,34 +36,29 @@ final class AsyncTransactionManagerImpl implements CommittableAsyncTransactionManager, SessionTransaction { private final SessionImpl session; - private Span span; - private io.opentelemetry.api.trace.Span openTelemetrySpan; + private ISpan span; private final Options options; private TransactionRunnerImpl.TransactionContextImpl txn; private TransactionState txnState; private final SettableApiFuture commitResponse = SettableApiFuture.create(); - AsyncTransactionManagerImpl( - SessionImpl session, - Span span, - io.opentelemetry.api.trace.Span openTelemetrySpan, - TransactionOption... options) { + AsyncTransactionManagerImpl(SessionImpl session, ISpan span, TransactionOption... options) { this.session = session; this.span = span; - this.openTelemetrySpan = openTelemetrySpan; this.options = Options.fromTransactionOptions(options); } @Override - public void setSpan(Span span) { + public void setSpan(ISpan span) { this.span = span; } + /** + * No-op method needed to implement SessionTransaction interface. + */ @Override - public void setOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) { - this.openTelemetrySpan = span; - } + public void setSpan(Span span) {} @Override public void close() { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index 2a2a554d1c0..b09e0ccb13c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -22,6 +22,7 @@ import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.DualSpan; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.protobuf.Struct; @@ -91,9 +92,11 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa super(builder.setTimestampBound(bound)); this.sessionName = session.getName(); this.options = session.getOptions(); - setSpan(Tracing.getTracer().getCurrentSpan()); - setOpenTelemetrySpan( - io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current())); + setSpan( + new DualSpan( + Tracing.getTracer().getCurrentSpan(), + io.opentelemetry.api.trace.Span.fromContext( + io.opentelemetry.context.Context.current()))); initTransaction(); } @@ -102,9 +105,11 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa super(builder.setTransactionId(batchTransactionId.getTransactionId())); this.sessionName = session.getName(); this.options = session.getOptions(); - setSpan(Tracing.getTracer().getCurrentSpan()); - setOpenTelemetrySpan( - io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current())); + setSpan( + new DualSpan( + Tracing.getTracer().getCurrentSpan(), + io.opentelemetry.api.trace.Span.fromContext( + io.opentelemetry.context.Context.current()))); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index bf0e71443b7..d81b498979a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -21,12 +21,12 @@ import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SpannerImpl.ClosedException; +import com.google.cloud.spanner.tracing.IScope; +import com.google.cloud.spanner.tracing.ISpan; +import com.google.cloud.spanner.tracing.TraceWrapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.util.concurrent.ListenableFuture; -import io.opencensus.common.Scope; -import io.opencensus.trace.Span; -import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; import javax.annotation.Nullable; @@ -34,9 +34,7 @@ class DatabaseClientImpl implements DatabaseClient { private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction"; private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction"; private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction"; - private static final Tracer tracer = Tracing.getTracer(); - - private final io.opentelemetry.api.trace.Tracer openTelemetryTracer; + private static final TraceWrapper tracer = new TraceWrapper(Tracing.getTracer()); @VisibleForTesting final String clientId; @VisibleForTesting final SessionPool pool; @@ -48,7 +46,6 @@ class DatabaseClientImpl implements DatabaseClient { DatabaseClientImpl(String clientId, SessionPool pool) { this.clientId = clientId; this.pool = pool; - this.openTelemetryTracer = SpannerOptions.getTracer(); } @VisibleForTesting @@ -76,19 +73,14 @@ public Timestamp write(final Iterable mutations) throws SpannerExcepti public CommitResponse writeWithOptions( final Iterable mutations, final TransactionOption... options) throws SpannerException { - Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_WRITE_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry(session -> session.writeWithOptions(mutations, options)); } catch (RuntimeException e) { - TraceUtil.setWithFailure(span, e); - OpenTelemetryTraceUtil.setWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); } } @@ -101,188 +93,159 @@ public Timestamp writeAtLeastOnce(final Iterable mutations) throws Spa public CommitResponse writeAtLeastOnceWithOptions( final Iterable mutations, final TransactionOption... options) throws SpannerException { - Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_WRITE_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry( session -> session.writeAtLeastOnceWithOptions(mutations, options)); } catch (RuntimeException e) { - TraceUtil.setWithFailure(span, e); - OpenTelemetryTraceUtil.setWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); } } @Override public ReadContext singleUse() { - Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_ONLY_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return getSession().singleUse(); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; + } finally { + span.end(); } } @Override public ReadContext singleUse(TimestampBound bound) { - Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_ONLY_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return getSession().singleUse(bound); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; + } finally { + span.end(); } } @Override public ReadOnlyTransaction singleUseReadOnlyTransaction() { - Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_ONLY_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return getSession().singleUseReadOnlyTransaction(); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; + } finally { + span.end(); } } @Override public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { - Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_ONLY_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return getSession().singleUseReadOnlyTransaction(bound); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; + } finally { + span.end(); } } @Override public ReadOnlyTransaction readOnlyTransaction() { - Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_ONLY_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return getSession().readOnlyTransaction(); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; + } finally { + span.end(); } } @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { - Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_ONLY_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return getSession().readOnlyTransaction(bound); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); - throw e; + span.setStatus(e); + throw e; + } finally { + span.end(); } } @Override public TransactionRunner readWriteTransaction(TransactionOption... options) { - Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_WRITE_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return getSession().readWriteTransaction(options); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); } } @Override public TransactionManager transactionManager(TransactionOption... options) { - Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_WRITE_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return getSession().transactionManager(options); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; + } finally { + span.end(); } } @Override public AsyncRunner runAsync(TransactionOption... options) { - Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_WRITE_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION); + ; + try (IScope s = tracer.withSpan(span)) { return getSession().runAsync(options); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; + } finally { + span.end(); } } @Override public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { - Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, READ_WRITE_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return getSession().transactionManagerAsync(options); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; + } finally { + span.end(); } } @Override public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { - Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, PARTITION_DML_TRANSACTION); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); + try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; + } finally { + span.end(); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java index 9896cc8aec9..b024989a892 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java @@ -65,7 +65,8 @@ int getCode() { return this.code.value(); } - Status getGrpcStatus() { + /** @return the corresponding gRPC status of this {@link Status}. */ + public Status getGrpcStatus() { return this.code.toStatus(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryTraceUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryTraceUtil.java deleted file mode 100644 index fdcda46cb15..00000000000 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryTraceUtil.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2023 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner; - -import com.google.cloud.Timestamp; -import com.google.spanner.v1.Transaction; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Context; - -/** Utility methods for tracing. */ -class OpenTelemetryTraceUtil { - - static Attributes getTransactionAnnotations(Transaction t) { - return Attributes.builder() - .put("Id", t.getId().toStringUtf8()) - .put("Timestamp", Timestamp.fromProto(t.getReadTimestamp()).toString()) - .build(); - } - - static Attributes getExceptionAnnotations(Throwable e) { - AttributesBuilder attributesBuilder = Attributes.builder(); - if (e instanceof SpannerException) { - attributesBuilder.put("Status", ((SpannerException) e).getErrorCode().toString()); - } - return attributesBuilder.build(); - } - - static Attributes getExceptionAnnotations(SpannerException e) { - return Attributes.builder() - .put("Status", ((SpannerException) e).getErrorCode().toString()) - .build(); - } - - static void addEvent(Span span, String eventName) { - span.addEvent(eventName); - } - - static void addEvent(Span span, String eventName, Attributes attributes) { - span.addEvent(eventName, attributes); - } - - static void addEventWithException(Span span, String eventName, Throwable e) { - span.addEvent(eventName, getExceptionAnnotations(e)); - span.recordException(e); - } - - static void addEventWithExceptionAndSetFailure(Span span, String eventName, Throwable e) { - span.addEvent(eventName, getExceptionAnnotations(e)); - span.setStatus(StatusCode.ERROR); - span.recordException(e); - } - - static void setWithFailure(Span span, Throwable e) { - span.setStatus(StatusCode.ERROR); - span.recordException(e); - } - - static void endSpanWithFailure(Span span, Throwable e) { - span.setStatus(StatusCode.ERROR); - span.recordException(e); - span.end(); - } - - static Span spanBuilder(Tracer tracer, String spanName) { - return tracer.spanBuilder(spanName).startSpan(); - } - - static Span spanBuilderWithExplicitParent(Tracer tracer, String spanName, Span parentSpan) { - if (parentSpan != null) - return tracer.spanBuilder(spanName).setParent(Context.root().with(parentSpan)).startSpan(); - else return tracer.spanBuilder(spanName).startSpan(); - } - - static void endSpan(Span span) { - span.end(); - } -} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index 099eff003aa..241b78f9015 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -26,6 +26,7 @@ import com.google.api.gax.rpc.UnavailableException; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.ISpan; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; @@ -137,13 +138,17 @@ public void invalidate() { isValid = false; } - // No-op method needed to implement SessionTransaction interface. + /** + * No-op method needed to implement SessionTransaction interface. + */ @Override - public void setSpan(Span span) {} + public void setSpan(ISpan span) {} - // No-op method needed to implement SessionTransaction interface. + /** + * No-op method needed to implement SessionTransaction interface. + */ @Override - public void setOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) {} + public void setSpan(Span span) {} private Duration tryUpdateTimeout(final Duration timeout, final Stopwatch stopwatch) { final Duration remainingTimeout = diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index aeef176ec3a..fa459c31d8d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -22,11 +22,11 @@ import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.IScope; +import com.google.cloud.spanner.tracing.ISpan; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import io.opencensus.common.Scope; -import io.opencensus.trace.Span; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -113,7 +113,6 @@ private final class BatchCreateSessionsRunnable implements Runnable { private final long channelHint; private final int sessionCount; private final SessionConsumer consumer; - private final io.opentelemetry.api.trace.Tracer openTelemetryTracer; private BatchCreateSessionsRunnable( int sessionCount, long channelHint, SessionConsumer consumer) { @@ -122,34 +121,22 @@ private BatchCreateSessionsRunnable( this.channelHint = channelHint; this.sessionCount = sessionCount; this.consumer = consumer; - this.openTelemetryTracer = SpannerOptions.getTracer(); } @Override public void run() { List sessions; int remainingSessionsToCreate = sessionCount; - Span span = SpannerImpl.tracer.spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder( - this.openTelemetryTracer, SpannerImpl.BATCH_CREATE_SESSIONS); - try (Scope s = SpannerImpl.tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = SpannerImpl.tracer.spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS); + try (IScope s = SpannerImpl.tracer.withSpan(span)) { SpannerImpl.tracer .getCurrentSpan() .addAnnotation(String.format("Creating %d sessions", sessionCount)); - OpenTelemetryTraceUtil.addEvent( - io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()), - String.format("Creating %d sessions", sessionCount)); while (remainingSessionsToCreate > 0) { try { sessions = internalBatchCreateSessions(remainingSessionsToCreate, channelHint); } catch (Throwable t) { - TraceUtil.setWithFailure(SpannerImpl.tracer.getCurrentSpan(), t); - OpenTelemetryTraceUtil.setWithFailure( - io.opentelemetry.api.trace.Span.fromContext( - io.opentelemetry.context.Context.current()), - t); + SpannerImpl.tracer.getCurrentSpan().setStatus(t); consumer.onSessionCreateFailure(t, remainingSessionsToCreate); break; } @@ -159,8 +146,7 @@ public void run() { remainingSessionsToCreate -= sessions.size(); } } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); } } } @@ -185,7 +171,6 @@ interface SessionConsumer { private final ExecutorFactory executorFactory; private final ScheduledExecutorService executor; private final DatabaseId db; - private final io.opentelemetry.api.trace.Tracer openTelemetryTracer; @GuardedBy("this") private volatile long sessionChannelCounter; @@ -198,7 +183,6 @@ interface SessionConsumer { this.db = db; this.executorFactory = executorFactory; this.executor = executorFactory.get(); - this.openTelemetryTracer = SpannerOptions.getTracer(); } @Override @@ -222,11 +206,8 @@ SessionImpl createSession() { synchronized (this) { options = optionMap(SessionOption.channelHint(sessionChannelCounter++)); } - Span span = SpannerImpl.tracer.spanBuilder(SpannerImpl.CREATE_SESSION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, SpannerImpl.CREATE_SESSION); - try (Scope s = SpannerImpl.tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = SpannerImpl.tracer.spanBuilder(SpannerImpl.CREATE_SESSION); + try (IScope s = SpannerImpl.tracer.withSpan(span)) { com.google.spanner.v1.Session session = spanner .getRpc() @@ -237,12 +218,10 @@ SessionImpl createSession() { options); return new SessionImpl(spanner, session.getName(), options); } catch (RuntimeException e) { - TraceUtil.setWithFailure(span, e); - OpenTelemetryTraceUtil.setWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); } } @@ -311,23 +290,12 @@ void asyncBatchCreateSessions( private List internalBatchCreateSessions( final int sessionCount, final long channelHint) throws SpannerException { final Map options = optionMap(SessionOption.channelHint(channelHint)); - io.opentelemetry.api.trace.Span openTelemetryParentSpan = - io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilderWithExplicitParent( - this.openTelemetryTracer, - SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, - openTelemetryParentSpan); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, String.format("Requesting %d sessions", sessionCount)); - Span parent = SpannerImpl.tracer.getCurrentSpan(); - Span span = - SpannerImpl.tracer - .spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent) - .startSpan(); + ISpan parent = SpannerImpl.tracer.getCurrentSpan(); + ISpan span = + SpannerImpl.tracer.spanBuilderWithExplicitParent( + SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent); span.addAnnotation(String.format("Requesting %d sessions", sessionCount)); - try (Scope s = SpannerImpl.tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + try (IScope s = SpannerImpl.tracer.withSpan(span)) { List sessions = spanner .getRpc() @@ -340,21 +308,15 @@ private List internalBatchCreateSessions( span.addAnnotation( String.format( "Request for %d sessions returned %d sessions", sessionCount, sessions.size())); - span.end(TraceUtil.END_SPAN_OPTIONS); - - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - String.format( - "Request for %d sessions returned %d sessions", sessionCount, sessions.size())); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); List res = new ArrayList<>(sessionCount); for (com.google.spanner.v1.Session session : sessions) { res.add(new SessionImpl(spanner, session.getName(), options)); } return res; } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); + span.end(); throw e; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index d8ecd8bb428..97f860d1664 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -30,6 +30,9 @@ import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.IScope; +import com.google.cloud.spanner.tracing.ISpan; +import com.google.cloud.spanner.tracing.TraceWrapper; import com.google.common.base.Ticker; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; @@ -40,9 +43,7 @@ import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.Transaction; import com.google.spanner.v1.TransactionOptions; -import io.opencensus.common.Scope; import io.opencensus.trace.Span; -import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; import java.util.ArrayList; import java.util.Collection; @@ -56,7 +57,7 @@ * users need not be aware of the actual session management, pooling and handling. */ class SessionImpl implements Session { - private static final Tracer tracer = Tracing.getTracer(); + private static final TraceWrapper tracer = new TraceWrapper(Tracing.getTracer()); /** Keep track of running transactions on this session per thread. */ static final ThreadLocal hasPendingTransaction = ThreadLocal.withInitial(() -> false); @@ -86,8 +87,8 @@ interface SessionTransaction { void invalidate(); /** Registers the current span on the transaction. */ void setSpan(Span span); - /** Registers the current opentelemetry span on the transaction. */ - void setOpenTelemetrySpan(io.opentelemetry.api.trace.Span span); + /** Registers the current span on the transaction. */ + void setSpan(ISpan span); } private final SpannerImpl spanner; @@ -96,16 +97,13 @@ interface SessionTransaction { private SessionTransaction activeTransaction; ByteString readyTransactionId; private final Map options; - private final io.opentelemetry.api.trace.Tracer openTelemetryTracer; - private Span currentSpan; - private io.opentelemetry.api.trace.Span openTelemetryCurrentSpan; + private ISpan currentSpan; SessionImpl(SpannerImpl spanner, String name, Map options) { this.spanner = spanner; this.options = options; this.name = checkNotNull(name); this.databaseId = SessionId.of(name).getDatabaseId(); - this.openTelemetryTracer = SpannerOptions.getTracer(); } @Override @@ -117,22 +115,14 @@ public String getName() { return options; } - void setCurrentSpan(Span span) { + void setCurrentSpan(ISpan span) { currentSpan = span; } - Span getCurrentSpan() { + ISpan getCurrentSpan() { return currentSpan; } - void setCurrentOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) { - openTelemetryCurrentSpan = span; - } - - io.opentelemetry.api.trace.Span getCurrentOpenTelemetrySpan() { - return openTelemetryCurrentSpan; - } - @Override public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { setActive(null); @@ -195,21 +185,16 @@ public CommitResponse writeAtLeastOnceWithOptions( } requestBuilder.setRequestOptions(requestOptionsBuilder.build()); } - Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, SpannerImpl.COMMIT); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT); + try (IScope s = tracer.withSpan(span)) { com.google.spanner.v1.CommitResponse response = spanner.getRpc().commit(requestBuilder.build(), this.options); return new CommitResponse(response); } catch (RuntimeException e) { - TraceUtil.setWithFailure(span, e); - OpenTelemetryTraceUtil.setWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); } } @@ -228,7 +213,6 @@ public ReadContext singleUse(TimestampBound bound) { .setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId)) .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) .setSpan(currentSpan) - .setOpenTelemetrySpan(openTelemetryCurrentSpan) .setExecutorProvider(spanner.getAsyncExecutorProvider()) .build()); } @@ -248,7 +232,6 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { .setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId)) .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) .setSpan(currentSpan) - .setOpenTelemetrySpan(openTelemetryCurrentSpan) .setExecutorProvider(spanner.getAsyncExecutorProvider()) .buildSingleUseReadOnlyTransaction()); } @@ -268,7 +251,6 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { .setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId)) .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) .setSpan(currentSpan) - .setOpenTelemetrySpan(openTelemetryCurrentSpan) .setExecutorProvider(spanner.getAsyncExecutorProvider()) .build()); } @@ -285,12 +267,12 @@ public AsyncRunner runAsync(TransactionOption... options) { @Override public TransactionManager transactionManager(TransactionOption... options) { - return new TransactionManagerImpl(this, currentSpan, openTelemetryCurrentSpan, options); + return new TransactionManagerImpl(this, currentSpan, options); } @Override public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) { - return new AsyncTransactionManagerImpl(this, currentSpan, openTelemetryCurrentSpan, options); + return new AsyncTransactionManagerImpl(this, currentSpan, options); } @Override @@ -306,20 +288,14 @@ public ApiFuture asyncClose() { @Override public void close() { - Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, SpannerImpl.DELETE_SESSION); - - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION); + try (IScope s = tracer.withSpan(span)) { spanner.getRpc().deleteSession(name, options); } catch (RuntimeException e) { - TraceUtil.setWithFailure(span, e); - OpenTelemetryTraceUtil.setWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); } } @@ -339,9 +315,7 @@ ApiFuture beginTransactionAsync(boolean routeToLeader) { ApiFuture beginTransactionAsync(Options transactionOptions, boolean routeToLeader) { final SettableApiFuture res = SettableApiFuture.create(); - final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan(); - final io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(this.openTelemetryTracer, SpannerImpl.BEGIN_TRANSACTION); + final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION); final BeginTransactionRequest request = BeginTransactionRequest.newBuilder() .setSession(name) @@ -351,30 +325,26 @@ ApiFuture beginTransactionAsync(Options transactionOptions, boolean spanner.getRpc().beginTransactionAsync(request, options, routeToLeader); requestFuture.addListener( () -> { - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + try (IScope s = tracer.withSpan(span)) { Transaction txn = requestFuture.get(); if (txn.getId().isEmpty()) { throw newSpannerException( ErrorCode.INTERNAL, "Missing id in transaction\n" + getName()); } - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); res.set(txn.getId()); } catch (ExecutionException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); res.setException( SpannerExceptionFactory.newSpannerException( e.getCause() == null ? e : e.getCause())); } catch (InterruptedException e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); res.setException(SpannerExceptionFactory.propagateInterrupt(e)); } catch (Exception e) { - TraceUtil.endSpanWithFailure(span, e); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetrySpan, e); + span.setStatus(e); res.setException(e); + } finally { + span.end(); } }, MoreExecutors.directExecutor()); @@ -392,7 +362,6 @@ TransactionContextImpl newTransaction(Options options) { .setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId)) .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) .setSpan(currentSpan) - .setOpenTelemetrySpan(openTelemetryCurrentSpan) .setExecutorProvider(spanner.getAsyncExecutorProvider()) .build(); } @@ -407,7 +376,6 @@ T setActive(@Nullable T ctx) { readyTransactionId = null; if (activeTransaction != null) { activeTransaction.setSpan(currentSpan); - activeTransaction.setOpenTelemetrySpan(openTelemetryCurrentSpan); } return ctx; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index d2825315a6e..68bf17be938 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -64,6 +64,10 @@ import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.cloud.spanner.SpannerImpl.ClosedException; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.DualSpan; +import com.google.cloud.spanner.tracing.IScope; +import com.google.cloud.spanner.tracing.ISpan; +import com.google.cloud.spanner.tracing.TraceWrapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.MoreObjects; @@ -79,7 +83,6 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Empty; import com.google.spanner.v1.ResultSetStats; -import io.opencensus.common.Scope; import io.opencensus.metrics.DerivedLongCumulative; import io.opencensus.metrics.DerivedLongGauge; import io.opencensus.metrics.LabelValue; @@ -89,15 +92,11 @@ import io.opencensus.trace.Annotation; import io.opencensus.trace.AttributeValue; import io.opencensus.trace.BlankSpan; -import io.opencensus.trace.Span; -import io.opencensus.trace.Status; -import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.api.trace.StatusCode; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -130,7 +129,7 @@ class SessionPool { private static final Logger logger = Logger.getLogger(SessionPool.class.getName()); - private static final Tracer tracer = Tracing.getTracer(); + private static final TraceWrapper tracer = new TraceWrapper(Tracing.getTracer()); static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession"; static final ImmutableSet SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES = ImmutableSet.of( @@ -1128,10 +1127,8 @@ private enum SessionState { } private PooledSessionFuture createPooledSessionFuture( - ListenableFuture future, - Span span, - io.opentelemetry.api.trace.Span openTelemetrySpan) { - return new PooledSessionFuture(future, span, openTelemetrySpan); + ListenableFuture future, ISpan span) { + return new PooledSessionFuture(future, span); } class PooledSessionFuture extends SimpleForwardingListenableFuture @@ -1139,17 +1136,12 @@ class PooledSessionFuture extends SimpleForwardingListenableFuture delegate, - Span span, - io.opentelemetry.api.trace.Span openTelemetrySpan) { + PooledSessionFuture(ListenableFuture delegate, ISpan span) { super(delegate); this.span = span; - this.openTelemetrySpan = openTelemetrySpan; } @VisibleForTesting @@ -1367,12 +1359,8 @@ PooledSession get(final boolean eligibleForLongRunning) { // ignore the exception as it will be handled by the call to super.get() below. } if (res != null) { - res.markBusy(span, openTelemetrySpan); - span.addAnnotation(sessionAnnotation(res)); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Using Session", - Attributes.builder().put("sessionId", res.getName()).build()); + res.markBusy(span); + span.addAnnotation("Using Session", "sessionId", res.getName()); synchronized (lock) { incrementNumSessionsInUse(); checkedOutSessions.add(this); @@ -1593,11 +1581,9 @@ public void prepareReadWriteTransaction() { private void keepAlive() { markUsed(); - final Span previousSpan = delegate.getCurrentSpan(); - final io.opentelemetry.api.trace.Span openTelemetrySpan = - delegate.getCurrentOpenTelemetrySpan(); - delegate.setCurrentSpan(BlankSpan.INSTANCE); - delegate.setCurrentOpenTelemetrySpan(io.opentelemetry.api.trace.Span.getInvalid()); + final ISpan previousSpan = delegate.getCurrentSpan(); + delegate.setCurrentSpan( + new DualSpan(BlankSpan.INSTANCE, io.opentelemetry.api.trace.Span.getInvalid())); try (ResultSet resultSet = delegate .singleUse(TimestampBound.ofMaxStaleness(60, TimeUnit.SECONDS)) @@ -1605,7 +1591,6 @@ private void keepAlive() { resultSet.next(); } finally { delegate.setCurrentSpan(previousSpan); - delegate.setCurrentOpenTelemetrySpan(openTelemetrySpan); } } @@ -1637,9 +1622,8 @@ private Dialect determineDialect() { } } - private void markBusy(Span span, io.opentelemetry.api.trace.Span openTelemetrySpan) { + private void markBusy(ISpan span) { this.delegate.setCurrentSpan(span); - this.delegate.setCurrentOpenTelemetrySpan(openTelemetrySpan); this.state = SessionState.BUSY; } @@ -1678,29 +1662,23 @@ private void put(SpannerException e) { public PooledSession get() { long currentTimeout = options.getInitialWaitForSessionTimeoutMillis(); while (true) { - Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - OpenTelemetryTraceUtil.spanBuilder(SpannerOptions.getTracer(), WAIT_FOR_SESSION); - try (Scope waitScope = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + ISpan span = tracer.spanBuilder(WAIT_FOR_SESSION); + try (IScope waitScope = tracer.withSpan(span)) { PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout); if (s == null) { // Set the status to DEADLINE_EXCEEDED and retry. numWaiterTimeouts.incrementAndGet(); - tracer.getCurrentSpan().setStatus(Status.DEADLINE_EXCEEDED); - io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()) - .setStatus(StatusCode.ERROR, "DEADLINE_EXCEEDED"); + // todo set status "DEADLINE_EXCEEDED" + // tracer.getCurrentSpan().setStatus("deadline"); currentTimeout = Math.min(currentTimeout * 2, MAX_SESSION_WAIT_TIMEOUT); } else { return s; } } catch (Exception e) { - TraceUtil.setWithFailure(span, e); - OpenTelemetryTraceUtil.setWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); } } } @@ -2379,22 +2357,21 @@ boolean isValid() { * */ PooledSessionFuture getSession() throws SpannerException { - Span span = Tracing.getTracer().getCurrentSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()); + ISpan span = + new DualSpan( + Tracing.getTracer().getCurrentSpan(), + io.opentelemetry.api.trace.Span.fromContext( + io.opentelemetry.context.Context.current())); span.addAnnotation("Acquiring session"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Acquiring session"); WaiterFuture waiter = null; PooledSession sess = null; synchronized (lock) { if (closureFuture != null) { span.addAnnotation("Pool has been closed"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Pool has been closed"); throw new IllegalStateException("Pool has been closed", closedException); } if (resourceNotFoundException != null) { span.addAnnotation("Database has been deleted"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Database has been deleted"); throw SpannerExceptionFactory.newSpannerException( ErrorCode.NOT_FOUND, String.format( @@ -2405,37 +2382,31 @@ PooledSessionFuture getSession() throws SpannerException { sess = sessions.poll(); if (sess == null) { span.addAnnotation("No session available"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "No session available"); maybeCreateSession(); waiter = new WaiterFuture(); waiters.add(waiter); } else { span.addAnnotation("Acquired session"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Acquired session"); } - return checkoutSession(span, openTelemetrySpan, sess, waiter); + return checkoutSession(span, sess, waiter); } } private PooledSessionFuture checkoutSession( - final Span span, - final io.opentelemetry.api.trace.Span openTelemetrySpan, - final PooledSession readySession, - WaiterFuture waiter) { + final ISpan span, final PooledSession readySession, WaiterFuture waiter) { ListenableFuture sessionFuture; if (waiter != null) { logger.log( Level.FINE, "No session available in the pool. Blocking for one to become available/created"); span.addAnnotation("Waiting for a session to come available"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Waiting for a session to come available"); sessionFuture = waiter; } else { SettableFuture fut = SettableFuture.create(); fut.set(readySession); sessionFuture = fut; } - PooledSessionFuture res = createPooledSessionFuture(sessionFuture, span, openTelemetrySpan); + PooledSessionFuture res = createPooledSessionFuture(sessionFuture, span); res.markCheckedOut(); return res; } @@ -2471,18 +2442,15 @@ private void incrementNumSessionsInUse() { } private void maybeCreateSession() { - Span span = Tracing.getTracer().getCurrentSpan(); - io.opentelemetry.api.trace.Span openTelemetrySpan = - io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()); + TraceWrapper tracer = new TraceWrapper(Tracing.getTracer()); + ISpan span = tracer.getCurrentSpan(); synchronized (lock) { if (numWaiters() >= numSessionsBeingCreated) { if (canCreateSession()) { span.addAnnotation("Creating sessions"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Creating sessions"); createSessions(getAllowedCreateSessions(options.getIncStep()), false); } else if (options.isFailIfPoolExhausted()) { span.addAnnotation("Pool exhausted. Failing"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Pool exhausted. Failing"); // throw specific exception throw newSpannerException( ErrorCode.RESOURCE_EXHAUSTED, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 75b2655ead3..f3b9b85e40e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -28,6 +28,7 @@ import com.google.cloud.spanner.spi.v1.GapicSpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.Paginated; +import com.google.cloud.spanner.tracing.TraceWrapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -37,7 +38,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; import io.opencensus.metrics.LabelValue; -import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; @@ -57,7 +57,7 @@ /** Default implementation of the Cloud Spanner interface. */ class SpannerImpl extends BaseService implements Spanner { private static final Logger logger = Logger.getLogger(SpannerImpl.class.getName()); - static final Tracer tracer = Tracing.getTracer(); + static final TraceWrapper tracer = new TraceWrapper(Tracing.getTracer()); static final String CREATE_SESSION = "CloudSpannerOperation.CreateSession"; static final String BATCH_CREATE_SESSIONS = "CloudSpannerOperation.BatchCreateSessions"; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java deleted file mode 100644 index 0d429661ad2..00000000000 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2017 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner; - -import com.google.cloud.Timestamp; -import com.google.common.collect.ImmutableMap; -import com.google.spanner.v1.Transaction; -import io.opencensus.contrib.grpc.util.StatusConverter; -import io.opencensus.trace.AttributeValue; -import io.opencensus.trace.EndSpanOptions; -import io.opencensus.trace.Span; -import io.opencensus.trace.Status; -import java.util.Map; - -/** Utility methods for tracing. */ -class TraceUtil { - - static final EndSpanOptions END_SPAN_OPTIONS = - EndSpanOptions.builder().setSampleToLocalSpanStore(true).build(); - - static Map getTransactionAnnotations(Transaction t) { - return ImmutableMap.of( - "Id", - AttributeValue.stringAttributeValue(t.getId().toStringUtf8()), - "Timestamp", - AttributeValue.stringAttributeValue(Timestamp.fromProto(t.getReadTimestamp()).toString())); - } - - static ImmutableMap getExceptionAnnotations(Throwable e) { - if (e instanceof SpannerException) { - return ImmutableMap.of( - "Status", - AttributeValue.stringAttributeValue(((SpannerException) e).getErrorCode().toString())); - } - return ImmutableMap.of(); - } - - static ImmutableMap getExceptionAnnotations(SpannerException e) { - return ImmutableMap.of( - "Status", AttributeValue.stringAttributeValue(e.getErrorCode().toString())); - } - - static void setWithFailure(Span span, Throwable e) { - if (e instanceof SpannerException) { - span.setStatus( - StatusConverter.fromGrpcStatus(((SpannerException) e).getErrorCode().getGrpcStatus()) - .withDescription(e.getMessage())); - } else { - span.setStatus(Status.INTERNAL.withDescription(e.getMessage())); - } - } - - static void endSpanWithFailure(Span span, Throwable e) { - if (e instanceof SpannerException) { - endSpanWithFailure(span, (SpannerException) e); - } else { - span.setStatus(Status.INTERNAL.withDescription(e.getMessage())); - span.end(END_SPAN_OPTIONS); - } - } - - static void endSpanWithFailure(Span span, SpannerException e) { - span.setStatus( - StatusConverter.fromGrpcStatus(e.getErrorCode().getGrpcStatus()) - .withDescription(e.getMessage())); - span.end(END_SPAN_OPTIONS); - } -} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index d9b419681a6..9ec96b25c38 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -19,54 +19,49 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; +import com.google.cloud.spanner.tracing.IScope; +import com.google.cloud.spanner.tracing.ISpan; +import com.google.cloud.spanner.tracing.TraceWrapper; import com.google.common.base.Preconditions; -import io.opencensus.common.Scope; import io.opencensus.trace.Span; -import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; /** Implementation of {@link TransactionManager}. */ final class TransactionManagerImpl implements TransactionManager, SessionTransaction { - private static final Tracer tracer = Tracing.getTracer(); + private static final TraceWrapper tracer = new TraceWrapper(Tracing.getTracer()); private final SessionImpl session; - private Span span; - private io.opentelemetry.api.trace.Span openTelemetrySpan; + private ISpan span; private final Options options; private TransactionRunnerImpl.TransactionContextImpl txn; private TransactionState txnState; - TransactionManagerImpl( - SessionImpl session, - Span span, - io.opentelemetry.api.trace.Span openTelemetrySpan, - TransactionOption... options) { + TransactionManagerImpl(SessionImpl session, ISpan span, TransactionOption... options) { this.session = session; this.span = span; - this.openTelemetrySpan = openTelemetrySpan; this.options = Options.fromTransactionOptions(options); } - Span getSpan() { + ISpan getSpan() { return span; } @Override - public void setSpan(Span span) { + public void setSpan(ISpan span) { this.span = span; } + /** + * No-op method needed to implement SessionTransaction interface. + */ @Override - public void setOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) { - this.openTelemetrySpan = span; - } + public void setSpan(Span span) {} @Override public TransactionContext begin() { Preconditions.checkState(txn == null, "begin can only be called once"); - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + try (IScope s = tracer.withSpan(span)) { txn = session.newTransaction(options); session.setActive(this); txnState = TransactionState.STARTED; @@ -114,8 +109,7 @@ public TransactionContext resetForRetry() { throw new IllegalStateException( "resetForRetry can only be called if the previous attempt" + " aborted"); } - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + try (IScope s = tracer.withSpan(span)) { boolean useInlinedBegin = txn.transactionId != null; txn = session.newTransaction(options); if (!useInlinedBegin) { @@ -150,8 +144,7 @@ public void close() { txnState = TransactionState.ROLLED_BACK; } } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - OpenTelemetryTraceUtil.endSpan(openTelemetrySpan); + span.end(); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 067479013d3..9ffa83647f6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -30,6 +30,9 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; +import com.google.cloud.spanner.tracing.IScope; +import com.google.cloud.spanner.tracing.ISpan; +import com.google.cloud.spanner.tracing.TraceWrapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -49,12 +52,8 @@ import com.google.spanner.v1.Transaction; import com.google.spanner.v1.TransactionOptions; import com.google.spanner.v1.TransactionSelector; -import io.opencensus.common.Scope; -import io.opencensus.trace.AttributeValue; import io.opencensus.trace.Span; -import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; -import io.opentelemetry.api.common.Attributes; import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -72,7 +71,7 @@ /** Default implementation of {@link TransactionRunner}. */ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { - private static final Tracer tracer = Tracing.getTracer(); + private static final TraceWrapper tracer = new TraceWrapper(Tracing.getTracer()); private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName()); /** * (Part of) the error message that is returned by Cloud Spanner if a transaction is cancelled @@ -246,14 +245,7 @@ ApiFuture ensureTxnAsync() { if (transactionId == null || isAborted()) { createTxnAsync(res); } else { - span.addAnnotation( - "Transaction Initialized", - ImmutableMap.of( - "Id", AttributeValue.stringAttributeValue(transactionId.toStringUtf8()))); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Transaction Initialized", - Attributes.builder().put("Id", transactionId.toStringUtf8()).build()); + span.addAnnotation("Transaction Initialized", "Id", transactionId.toStringUtf8()); txnLogger.log( Level.FINER, "Using prepared transaction {0}", @@ -265,20 +257,12 @@ ApiFuture ensureTxnAsync() { private void createTxnAsync(final SettableApiFuture res) { span.addAnnotation("Creating Transaction"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Creating Transaction"); final ApiFuture fut = session.beginTransactionAsync(options, isRouteToLeader()); fut.addListener( () -> { try { transactionId = fut.get(); - span.addAnnotation( - "Transaction Creation Done", - ImmutableMap.of( - "Id", AttributeValue.stringAttributeValue(transactionId.toStringUtf8()))); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Transaction Creating Done", - Attributes.builder().put("Id", transactionId.toStringUtf8()).build()); + span.addAnnotation("Transaction Creation Done", "Id", transactionId.toStringUtf8()); txnLogger.log( Level.FINER, "Started transaction {0}", @@ -286,13 +270,7 @@ private void createTxnAsync(final SettableApiFuture res) { res.set(null); } catch (ExecutionException e) { span.addAnnotation( - "Transaction Creation Failed", - TraceUtil.getExceptionAnnotations(e.getCause() == null ? e : e.getCause())); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Transaction Creating Failed", - OpenTelemetryTraceUtil.getExceptionAnnotations(e)); - + "Transaction Creation Failed", e.getCause() == null ? e : e.getCause()); res.setException(e.getCause() == null ? e : e.getCause()); } catch (InterruptedException e) { res.setException(SpannerExceptionFactory.propagateInterrupt(e)); @@ -400,27 +378,19 @@ public void run() { } final CommitRequest commitRequest = requestBuilder.build(); span.addAnnotation("Starting Commit"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Starting Commit"); - final Span opSpan = - tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan(); - final io.opentelemetry.api.trace.Span openTelemetryOpSpan = - OpenTelemetryTraceUtil.spanBuilderWithExplicitParent( - SpannerOptions.getTracer(), SpannerImpl.COMMIT, openTelemetrySpan); + final ISpan opSpan = tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span); final ApiFuture commitFuture = rpc.commitAsync(commitRequest, session.getOptions()); commitFuture.addListener( () -> { - try (Scope s = tracer.withSpan(opSpan); - io.opentelemetry.context.Scope ss = openTelemetryOpSpan.makeCurrent()) { + try (IScope s = tracer.withSpan(opSpan)) { com.google.spanner.v1.CommitResponse proto = commitFuture.get(); if (!proto.hasCommitTimestamp()) { throw newSpannerException( ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName()); } span.addAnnotation("Commit Done"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Commit Done"); - opSpan.end(TraceUtil.END_SPAN_OPTIONS); - openTelemetryOpSpan.end(); + opSpan.end(); res.set(new CommitResponse(proto)); } catch (Throwable e) { if (e instanceof ExecutionException) { @@ -432,15 +402,11 @@ public void run() { } else { e = SpannerExceptionFactory.newSpannerException(e); } - span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e)); - TraceUtil.endSpanWithFailure(opSpan, e); - - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Commit Failed", - OpenTelemetryTraceUtil.getExceptionAnnotations(e)); - OpenTelemetryTraceUtil.endSpanWithFailure(openTelemetryOpSpan, e); + span.addAnnotation("Commit Failed", e); + opSpan.setStatus(e); res.setException(onError((SpannerException) e, false)); + } finally { + opSpan.end(); } }, MoreExecutors.directExecutor()); @@ -471,11 +437,7 @@ void rollback() { rollbackAsync().get(); } catch (ExecutionException e) { txnLogger.log(Level.FINE, "Exception during rollback", e); - span.addAnnotation("Rollback Failed", TraceUtil.getExceptionAnnotations(e)); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Rollback Failed", - OpenTelemetryTraceUtil.getExceptionAnnotations(e)); + span.addAnnotation("Rollback Failed", e); } catch (InterruptedException e) { throw SpannerExceptionFactory.propagateInterrupt(e); } @@ -493,7 +455,6 @@ ApiFuture rollbackAsync() { // is still in flight. That transaction will then automatically be terminated by the server. if (transactionId != null) { span.addAnnotation("Starting Rollback"); - OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Starting Rollback"); return rpc.rollbackAsync( RollbackRequest.newBuilder() .setSession(session.getName()) @@ -963,8 +924,7 @@ public ListenableAsyncResultSet executeQueryAsync( private boolean blockNestedTxn = true; private final SessionImpl session; private final Options options; - private Span span; - private io.opentelemetry.api.trace.Span openTelemetrySpan; + private ISpan span; private TransactionContextImpl txn; private volatile boolean isValid = true; @@ -981,27 +941,26 @@ public TransactionRunner allowNestedTransaction() { } @Override - public void setSpan(Span span) { + public void setSpan(ISpan span) { this.span = span; } + /** + * No-op method needed to implement SessionTransaction interface. + */ @Override - public void setOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) { - this.openTelemetrySpan = span; - } + public void setSpan(Span span) {} @Nullable @Override public T run(TransactionCallable callable) { - try (Scope s = tracer.withSpan(span); - io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) { + try (IScope s = tracer.withSpan(span)) { if (blockNestedTxn) { SessionImpl.hasPendingTransaction.set(Boolean.TRUE); } return runInternal(callable); } catch (RuntimeException e) { - TraceUtil.setWithFailure(span, e); - OpenTelemetryTraceUtil.setWithFailure(openTelemetrySpan, e); + span.setStatus(e); throw e; } finally { // Remove threadLocal rather than set to FALSE to avoid a possible memory leak. @@ -1025,13 +984,7 @@ private T runInternal(final TransactionCallable txCallable) { checkState( isValid, "TransactionRunner has been invalidated by a new operation on the session"); attempt.incrementAndGet(); - span.addAnnotation( - "Starting Transaction Attempt", - ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt.longValue()))); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Starting Transaction Attempt", - Attributes.builder().put("Attempt", attempt.longValue()).build()); + span.addAnnotation("Starting Transaction Attempt", "Attempt", attempt.longValue()); // Only ensure that there is a transaction if we should not inline the beginTransaction // with the first statement. if (!useInlinedBegin) { @@ -1048,12 +1001,9 @@ private T runInternal(final TransactionCallable txCallable) { if (txn.isAborted() || (e instanceof AbortedException)) { span.addAnnotation( "Transaction Attempt Aborted in user operation. Retrying", - ImmutableMap.of( - "Attempt", AttributeValue.longAttributeValue(attempt.longValue()))); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Transaction Attempt Aborted in user operation. Retrying", - Attributes.builder().put("Attempt", attempt.longValue()).build()); + "Attempt", + attempt.longValue()); + shouldRollback = false; if (e instanceof AbortedException) { throw e; @@ -1069,17 +1019,8 @@ private T runInternal(final TransactionCallable txCallable) { } span.addAnnotation( "Transaction Attempt Failed in user operation", - ImmutableMap.builder() - .putAll(TraceUtil.getExceptionAnnotations(toThrow)) - .put("Attempt", AttributeValue.longAttributeValue(attempt.longValue())) - .build()); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Transaction Attempt Failed in user operation", - Attributes.builder() - .put("Status", toThrow.getErrorCode().toString()) - .put("Attempt", attempt.longValue()) - .build()); + ImmutableMap.of( + "Attempt", attempt.longValue(), "Status", toThrow.getErrorCode().toString())); throw toThrow; } finally { if (shouldRollback) { @@ -1089,38 +1030,21 @@ private T runInternal(final TransactionCallable txCallable) { try { txn.commit(); - span.addAnnotation( - "Transaction Attempt Succeeded", - ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt.longValue()))); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Transaction Attempt Succeeded", - Attributes.builder().put("Attempt", attempt.longValue()).build()); + span.addAnnotation("Transaction Attempt Succeeded", "Attempt", attempt.longValue()); return result; } catch (AbortedException e) { txnLogger.log(Level.FINE, "Commit aborted", e); span.addAnnotation( - "Transaction Attempt Aborted in Commit. Retrying", - ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt.longValue()))); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Transaction Attempt Aborted in Commit. Retrying", - Attributes.builder().put("Attempt", attempt.longValue()).build()); + "Transaction Attempt Aborted in Commit. Retrying", "Attempt", attempt.longValue()); throw e; } catch (SpannerException e) { span.addAnnotation( "Transaction Attempt Failed in Commit", - ImmutableMap.builder() - .putAll(TraceUtil.getExceptionAnnotations(e)) - .put("Attempt", AttributeValue.longAttributeValue(attempt.longValue())) - .build()); - OpenTelemetryTraceUtil.addEvent( - openTelemetrySpan, - "Transaction Attempt Failed in Commit", - Attributes.builder() - .put("Status", ((SpannerException) e).getErrorCode().toString()) - .put("Attempt", attempt.longValue()) - .build()); + ImmutableMap.of( + "Attempt", + attempt.longValue(), + "Status", + ((SpannerException) e).getErrorCode().toString())); throw e; } }; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/DualScope.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/DualScope.java new file mode 100644 index 00000000000..ff6d8f03be4 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/DualScope.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.tracing; + +import io.opencensus.common.Scope; + +public class DualScope implements IScope { + + private Scope openCensusScope; + private io.opentelemetry.context.Scope openTelemetryScope; + + public DualScope(Scope openCensusScope, io.opentelemetry.context.Scope openTelemetryScope) { + this.openCensusScope = openCensusScope; + this.openTelemetryScope = openTelemetryScope; + } + + @Override + public void close() { + if (openCensusScope != null) { + openCensusScope.close(); + } + if (openTelemetryScope != null) { + openTelemetryScope.close(); + } + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/DualSpan.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/DualSpan.java new file mode 100644 index 00000000000..82f968c27a8 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/DualSpan.java @@ -0,0 +1,134 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.tracing; + +import com.google.cloud.spanner.SpannerException; +import com.google.common.collect.ImmutableMap; +import io.opencensus.contrib.grpc.util.StatusConverter; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.EndSpanOptions; +import io.opencensus.trace.Span; +import io.opencensus.trace.Status; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.StatusCode; +import java.util.HashMap; +import java.util.Map; + +public class DualSpan implements ISpan { + + private static EndSpanOptions END_SPAN_OPTIONS = + EndSpanOptions.builder().setSampleToLocalSpanStore(true).build(); + private Span openCensusSpan; + private io.opentelemetry.api.trace.Span openTelemetrySpan; + + public DualSpan(Span openCensusSpan, io.opentelemetry.api.trace.Span openTelemetrySpan) { + this.openCensusSpan = openCensusSpan; + this.openTelemetrySpan = openTelemetrySpan; + } + + public Span getOpenCensusSpan() { + return openCensusSpan; + } + + public io.opentelemetry.api.trace.Span getOpenTelemetrySpan() { + return openTelemetrySpan; + } + + @Override + public void addAnnotation(String message, Map attributes) { + AttributesBuilder otAttributesBuilder = Attributes.builder(); + Map ocAttributeValues = new HashMap<>(); + for (Map.Entry entry : attributes.entrySet()) { + String key = entry.getKey(); + Object val = entry.getValue(); + if (val == null || val instanceof String) { + String strVal = (String) val; + otAttributesBuilder.put(key, strVal); + ocAttributeValues.put(key, AttributeValue.stringAttributeValue(strVal)); + } else if (val instanceof Long) { + long longVal = (Long) val; + otAttributesBuilder.put(key, longVal); + ocAttributeValues.put(key, AttributeValue.longAttributeValue(longVal)); + } + } + openTelemetrySpan.addEvent(message, otAttributesBuilder.build()); + openCensusSpan.addAnnotation(message, ocAttributeValues); + } + + @Override + public void addAnnotation(String message) { + openTelemetrySpan.addEvent(message); + openCensusSpan.addAnnotation(message); + } + + @Override + public void addAnnotation(String message, String key, String value) { + openTelemetrySpan.addEvent(message, Attributes.builder().put(key, value).build()); + openCensusSpan.addAnnotation( + message, ImmutableMap.of(key, AttributeValue.stringAttributeValue(value))); + } + + @Override + public void addAnnotation(String message, String key, long value) { + openTelemetrySpan.addEvent(message, Attributes.builder().put(key, value).build()); + openCensusSpan.addAnnotation( + message, ImmutableMap.of(key, AttributeValue.longAttributeValue(value))); + } + + @Override + public void addAnnotation(String message, Throwable e) { + openCensusSpan.addAnnotation(message, this.getOpenCensusExceptionAnnotations(e)); + openTelemetrySpan.addEvent(message, this.getOpenTelemetryExceptionAnnotations(e)); + } + + @Override + public void setStatus(Throwable e) { + openTelemetrySpan.setStatus(StatusCode.ERROR); + openTelemetrySpan.recordException(e); + if (e instanceof SpannerException) { + openCensusSpan.setStatus( + StatusConverter.fromGrpcStatus(((SpannerException) e).getErrorCode().getGrpcStatus()) + .withDescription(e.getMessage())); + } else { + openCensusSpan.setStatus(Status.INTERNAL.withDescription(e.getMessage())); + } + } + + @Override + public void end() { + openCensusSpan.end(END_SPAN_OPTIONS); + openTelemetrySpan.end(); + } + + private ImmutableMap getOpenCensusExceptionAnnotations(Throwable e) { + if (e instanceof SpannerException) { + return ImmutableMap.of( + "Status", + AttributeValue.stringAttributeValue(((SpannerException) e).getErrorCode().toString())); + } + return ImmutableMap.of(); + } + + private Attributes getOpenTelemetryExceptionAnnotations(Throwable e) { + AttributesBuilder attributesBuilder = Attributes.builder(); + if (e instanceof SpannerException) { + attributesBuilder.put("Status", ((SpannerException) e).getErrorCode().toString()); + } + return attributesBuilder.build(); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/IScope.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/IScope.java new file mode 100644 index 00000000000..099e2227e37 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/IScope.java @@ -0,0 +1,22 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.tracing; + +public interface IScope extends AutoCloseable { + @Override + void close(); +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/ISpan.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/ISpan.java new file mode 100644 index 00000000000..5b59536b402 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/ISpan.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.tracing; + +import java.util.Map; + +public interface ISpan { + + void addAnnotation(String message, Map attributes); + + void addAnnotation(String message); + + void addAnnotation(String message, String key, String value); + + void addAnnotation(String message, String key, long value); + + void addAnnotation(String message, Throwable e); + + void setStatus(Throwable e); + + void end(); +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/TraceWrapper.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/TraceWrapper.java new file mode 100644 index 00000000000..4cb32fd1f74 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/tracing/TraceWrapper.java @@ -0,0 +1,80 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.tracing; + +import com.google.cloud.spanner.SpannerOptions; +import io.opencensus.trace.Span; +import io.opencensus.trace.Tracer; +import io.opentelemetry.context.Context; + +public class TraceWrapper { + + private Tracer openCensusTracer; + + public TraceWrapper(Tracer openCensusTracer) { + this.openCensusTracer = openCensusTracer; + } + + public ISpan spanBuilder(String spanName) { + return new DualSpan( + openCensusTracer.spanBuilder(spanName).startSpan(), + SpannerOptions.getTracer().spanBuilder(spanName).startSpan()); + } + + public ISpan spanBuilderWithExplicitParent(String spanName, ISpan parentSpan) { + DualSpan dualParentSpan; + if (!(parentSpan instanceof DualSpan)) { + dualParentSpan = new DualSpan(null, null); + } else { + dualParentSpan = (DualSpan) parentSpan; + } + Span ocSpan = + openCensusTracer + .spanBuilderWithExplicitParent(spanName, dualParentSpan.getOpenCensusSpan()) + .startSpan(); + + io.opentelemetry.api.trace.Span otSpan; + + if (dualParentSpan.getOpenTelemetrySpan() != null) + otSpan = + SpannerOptions.getTracer() + .spanBuilder(spanName) + .setParent(Context.current().with(dualParentSpan.getOpenTelemetrySpan())) + .startSpan(); + else otSpan = SpannerOptions.getTracer().spanBuilder(spanName).startSpan(); + + return new DualSpan(ocSpan, otSpan); + } + + public ISpan getCurrentSpan() { + return new DualSpan( + openCensusTracer.getCurrentSpan(), + io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current())); + } + + public IScope withSpan(ISpan span) { + DualSpan dualSpan; + if (!(span instanceof DualSpan)) { + dualSpan = new DualSpan(null, null); + } else { + dualSpan = (DualSpan) span; + } + return new DualScope( + openCensusTracer.withSpan(dualSpan.getOpenCensusSpan()), + dualSpan.getOpenTelemetrySpan().makeCurrent()); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java index 4a2345b3cef..5723a2f150b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.tracing.DualSpan; import io.opencensus.trace.Span; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,8 +40,7 @@ public void testCommitReturnsCommitStats() { try (AsyncTransactionManagerImpl manager = new AsyncTransactionManagerImpl( session, - mock(Span.class), - mock(io.opentelemetry.api.trace.Span.class), + new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class)), Options.commitStats())) { when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats()))) .thenReturn(transaction); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java index 39b31c27486..047353adad0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java @@ -41,7 +41,6 @@ import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import java.io.IOException; import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.stream.Collectors; import org.junit.After; import org.junit.AfterClass; @@ -103,11 +102,7 @@ public static void startStaticServer() throws IOException { INVALID_UPDATE_STATEMENT, Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); String uniqueName = InProcessServerBuilder.generateName(); - server = - InProcessServerBuilder.forName(uniqueName) - .addService(mockSpanner) - .build() - .start(); + server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start(); channelProvider = LocalChannelProvider.create(uniqueName); } @@ -162,24 +157,24 @@ public void singleUse() { while (rs.next()) { // Just consume the result set. } - List spans = - spanExporter.getFinishedSpanItems().stream() - .map(SpanData::getName) - .distinct() - .collect(Collectors.toList()); - - boolean hasSpanData = - spans.stream() - .allMatch( - name -> - name.equals("CloudSpannerOperation.BatchCreateSessionsRequest") - || name.equals("CloudSpannerOperation.ExecuteStreamingQuery") - || name.equals("CloudSpannerOperation.BatchCreateSessions") - || name.equals("CloudSpanner.ReadOnlyTransaction") - || name.equals("SessionPool.WaitForSession")); - assertTrue(hasSpanData); - assertEquals(5, spans.size()); } + List spans = + spanExporter.getFinishedSpanItems().stream() + .map(SpanData::getName) + .distinct() + .collect(Collectors.toList()); + + boolean hasSpanData = + spans.stream() + .allMatch( + name -> + name.equals("CloudSpannerOperation.BatchCreateSessionsRequest") + || name.equals("CloudSpannerOperation.ExecuteStreamingQuery") + || name.equals("CloudSpannerOperation.BatchCreateSessions") + || name.equals("CloudSpanner.ReadOnlyTransaction") + || name.equals("SessionPool.WaitForSession")); + assertTrue(hasSpanData); + assertEquals(5, spans.size()); } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java index 23439a3698e..331443d2f2c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java @@ -24,6 +24,8 @@ import com.google.api.client.util.BackOff; import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator; +import com.google.cloud.spanner.tracing.DualSpan; +import com.google.cloud.spanner.tracing.ISpan; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Lists; @@ -36,7 +38,6 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.protobuf.ProtoUtils; -import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.Span; import java.io.IOException; import java.lang.reflect.Field; @@ -141,7 +142,6 @@ private void initWithLimit(int maxBufferSize) { maxBufferSize, "", null, - null, SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(), SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) { @Override @@ -169,14 +169,8 @@ public void closedSpan() { "This test is only supported on JDK11 and lower", JavaVersionUtil.getJavaMajorVersion() < 12); - Span span = mock(Span.class); - io.opentelemetry.api.trace.Span openTelemetrySpan = mock(io.opentelemetry.api.trace.Span.class); + ISpan span = new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class)); setInternalState(ResumableStreamIterator.class, this.resumableStreamIterator, "span", span); - setInternalState( - ResumableStreamIterator.class, - this.resumableStreamIterator, - "openTelemetrySpan", - openTelemetrySpan); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); @@ -187,8 +181,7 @@ public void closedSpan() { assertThat(consume(resumableStreamIterator)).containsExactly("a", "b").inOrder(); resumableStreamIterator.close("closed"); - verify(span).end(EndSpanOptions.builder().setSampleToLocalSpanStore(true).build()); - verify(openTelemetrySpan).end(); + verify(span).end(); } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index c09d4a8f613..2dc3225a92c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -35,6 +35,7 @@ import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.DualSpan; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; @@ -128,9 +129,9 @@ public void setUp() { .thenReturn( SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()); session = spanner.getSessionClient(db).createSession(); - ((SessionImpl) session).setCurrentSpan(mock(Span.class)); ((SessionImpl) session) - .setCurrentOpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)); + .setCurrentSpan( + new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class))); // We expect the same options, "options", on all calls on "session". options = optionsCaptor.getValue(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 94ba4665216..21f38c3fa39 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -70,6 +70,7 @@ import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.ResultStreamConsumer; +import com.google.cloud.spanner.tracing.DualSpan; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; @@ -1215,8 +1216,8 @@ public void testSessionNotFoundReadWriteTransaction() { .thenReturn(closedTransactionContext); when(closedSession.beginTransactionAsync(any(), eq(true))).thenThrow(sessionNotFound); TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession); - closedTransactionRunner.setSpan(mock(Span.class)); - closedTransactionRunner.setOpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)); + closedTransactionRunner.setSpan( + new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class))); when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner); final SessionImpl openSession = mock(SessionImpl.class); @@ -1230,8 +1231,8 @@ public void testSessionNotFoundReadWriteTransaction() { when(openSession.beginTransactionAsync(any(), eq(true))) .thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn"))); TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession); - openTransactionRunner.setSpan(mock(Span.class)); - openTransactionRunner.setOpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)); + openTransactionRunner.setSpan( + new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class))); when(openSession.readWriteTransaction()).thenReturn(openTransactionRunner); ResultSet openResultSet = mock(ResultSet.class); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 03b3f4ce1c6..f36e8f1a880 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -34,6 +35,7 @@ import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.TransactionManager.TransactionState; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.DualSpan; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.spanner.v1.BeginTransactionRequest; @@ -82,7 +84,7 @@ public void setUp() { initMocks(this); manager = new TransactionManagerImpl( - session, mock(Span.class), mock(io.opentelemetry.api.trace.Span.class)); + session, new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class))); } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 18aa087e54f..f92762c0dcc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -34,6 +35,7 @@ import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.tracing.DualSpan; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; @@ -128,8 +130,8 @@ public void setUp() { .build())); when(rpc.rollbackAsync(Mockito.any(RollbackRequest.class), Mockito.anyMap())) .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); - transactionRunner.setSpan(mock(Span.class)); - transactionRunner.setOpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)); + transactionRunner.setSpan( + new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class))); } @SuppressWarnings("unchecked") @@ -288,11 +290,10 @@ public void prepareReadWriteTransaction() { throw new IllegalStateException(); } }; - session.setCurrentSpan(mock(Span.class)); - session.setCurrentOpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)); + session.setCurrentSpan( + new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class))); TransactionRunnerImpl runner = new TransactionRunnerImpl(session); - runner.setSpan(mock(Span.class)); - runner.setOpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)); + runner.setSpan(new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class))); assertThat(usedInlinedBegin).isFalse(); runner.run( transaction -> { @@ -323,8 +324,7 @@ private long[] batchDmlException(int status) { ApiFutures.immediateFuture(ByteString.copyFromUtf8(UUID.randomUUID().toString()))); when(session.getName()).thenReturn(SessionId.of("p", "i", "d", "test").getName()); TransactionRunnerImpl runner = new TransactionRunnerImpl(session); - runner.setSpan(mock(Span.class)); - runner.setOpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)); + runner.setSpan(new DualSpan(mock(Span.class), mock(io.opentelemetry.api.trace.Span.class))); ExecuteBatchDmlResponse response1 = ExecuteBatchDmlResponse.newBuilder() .addResultSets(