Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Sep 20, 2023
1 parent fe6e4a5 commit 050b164
Show file tree
Hide file tree
Showing 28 changed files with 651 additions and 779 deletions.
52 changes: 27 additions & 25 deletions google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<properties>
<site.installationModule>google-cloud-spanner</site.installationModule>
<opencensus.version>0.31.1</opencensus.version>
<opentelemetry.version>1.28.0</opentelemetry.version>
<opentelemetry.version>1.30.1</opentelemetry.version>
<graalvm.version>22.3.3</graalvm.version>
<spanner.testenv.config.class>com.google.cloud.spanner.GceTestEnvConfig</spanner.testenv.config.class>
<spanner.testenv.instance>projects/gcloud-devel/instances/spanner-testing-east1</spanner.testenv.instance>
Expand Down Expand Up @@ -249,30 +249,6 @@
<artifactId>opentelemetry-context</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-metrics</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
Expand Down Expand Up @@ -434,6 +410,32 @@
<scope>test</scope>
</dependency>

<!-- OpenTelemetry test dependencies -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-metrics</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.tracing.DualSpan;
import com.google.cloud.spanner.tracing.ISpan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
Expand Down Expand Up @@ -69,9 +72,10 @@ abstract class AbstractReadContext
abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadContext> {
private SessionImpl session;
private SpannerRpc rpc;
private io.opentelemetry.api.trace.Span openTelemetrySpan =
io.opentelemetry.api.trace.Span.fromContext(Context.current());
private Span span = Tracing.getTracer().getCurrentSpan();
private ISpan span =
new DualSpan(
Tracing.getTracer().getCurrentSpan(),
io.opentelemetry.api.trace.Span.fromContext(Context.current()));
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private ExecutorProvider executorProvider;
Expand All @@ -93,16 +97,11 @@ B setRpc(SpannerRpc rpc) {
return self();
}

B setSpan(Span span) {
B setSpan(ISpan span) {
this.span = span;
return self();
}

B setOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) {
this.openTelemetrySpan = span;
return self();
}

B setDefaultPrefetchChunks(int defaultPrefetchChunks) {
this.defaultPrefetchChunks = defaultPrefetchChunks;
return self();
Expand Down Expand Up @@ -356,7 +355,6 @@ void initTransaction() {
if (transactionId != null) {
return;
}
OpenTelemetryTraceUtil.addEvent(openTelemetrySpan, "Creating Transaction");
span.addAnnotation("Creating Transaction");
try {
TransactionOptions.Builder options = TransactionOptions.newBuilder();
Expand All @@ -383,16 +381,16 @@ void initTransaction() {
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
}
transactionId = transaction.getId();
OpenTelemetryTraceUtil.addEvent(
openTelemetrySpan,
"Transaction Creation Done",
OpenTelemetryTraceUtil.getTransactionAnnotations(transaction));
span.addAnnotation(
"Transaction Creation Done", TraceUtil.getTransactionAnnotations(transaction));
"Transaction Creation Done",
ImmutableMap.of(
"id",
transaction.getId().toStringUtf8(),
"Timestamp",
Timestamp.fromProto(transaction.getReadTimestamp()).toString()));

} catch (SpannerException e) {
OpenTelemetryTraceUtil.addEventWithException(
openTelemetrySpan, "Transaction Creation Failed", e);
span.addAnnotation("Transaction Creation Failed", TraceUtil.getExceptionAnnotations(e));
span.addAnnotation("Transaction Creation Failed", e);
throw e;
}
}
Expand All @@ -403,8 +401,7 @@ void initTransaction() {
final SessionImpl session;
final SpannerRpc rpc;
final ExecutorProvider executorProvider;
Span span;
io.opentelemetry.api.trace.Span openTelemetrySpan;
ISpan span;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

Expand All @@ -431,19 +428,19 @@ void initTransaction() {
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.span = builder.span;
this.openTelemetrySpan = builder.openTelemetrySpan;
this.executorProvider = builder.executorProvider;
}

@Override
public void setSpan(Span span) {
public void setSpan(ISpan span) {
this.span = span;
}

/**
* No-op method needed to implement SessionTransaction interface.
*/
@Override
public void setOpenTelemetrySpan(io.opentelemetry.api.trace.Span span) {
this.openTelemetrySpan = span;
}
public void setSpan(Span span) {}

long getSeqNo() {
return seqNo.incrementAndGet();
Expand Down Expand Up @@ -690,7 +687,6 @@ ResultSet executeQueryInternalWithOptions(
MAX_BUFFERED_CHUNKS,
SpannerImpl.QUERY,
span,
openTelemetrySpan,
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
Expand Down Expand Up @@ -750,8 +746,7 @@ public final void invalidate() {

@Override
public void close() {
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
synchronized (lock) {
isClosed = true;
}
Expand Down Expand Up @@ -831,7 +826,6 @@ ResultSet readInternalWithOptions(
MAX_BUFFERED_CHUNKS,
SpannerImpl.READ,
span,
openTelemetrySpan,
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Type.StructField;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.tracing.IScope;
import com.google.cloud.spanner.tracing.ISpan;
import com.google.cloud.spanner.tracing.TraceWrapper;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
Expand All @@ -50,12 +52,7 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TypeCode;
import io.grpc.Context;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
Expand Down Expand Up @@ -83,7 +80,7 @@

/** Implementation of {@link ResultSet}. */
abstract class AbstractResultSet<R> extends AbstractStructReader implements ResultSet {
private static final Tracer tracer = Tracing.getTracer();
private static final TraceWrapper tracer = new TraceWrapper(Tracing.getTracer());
private static final com.google.protobuf.Value NULL_VALUE =
com.google.protobuf.Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();

Expand Down Expand Up @@ -1094,8 +1091,7 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
private final BackOff backOff;
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
private final int maxBufferSize;
private final Span span;
private final io.opentelemetry.api.trace.Span openTelemetrySpan;
private final ISpan span;
private CloseableIterator<PartialResultSet> stream;
private ByteString resumeToken;
private boolean finished;
Expand All @@ -1109,16 +1105,12 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
protected ResumableStreamIterator(
int maxBufferSize,
String streamName,
Span parent,
io.opentelemetry.api.trace.Span openTelemetryParent,
ISpan parent,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.openTelemetrySpan =
OpenTelemetryTraceUtil.spanBuilderWithExplicitParent(
SpannerOptions.getTracer(), streamName, openTelemetryParent);
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan();
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent);
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.backOff = newBackOff();
Expand Down Expand Up @@ -1174,15 +1166,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
}

private void backoffSleep(Context context, long backoffMillis) throws SpannerException {
tracer
.getCurrentSpan()
.addAnnotation(
"Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
OpenTelemetryTraceUtil.addEvent(
io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()),
"Backing off",
Attributes.builder().put("Delay", backoffMillis).build());
tracer.getCurrentSpan().addAnnotation("Backing off", "Delay", backoffMillis);
final CountDownLatch latch = new CountDownLatch(1);
final Context.CancellationListener listener =
ignored -> {
Expand Down Expand Up @@ -1222,8 +1206,7 @@ public void execute(Runnable command) {
public void close(@Nullable String message) {
if (stream != null) {
stream.close(message);
span.end(TraceUtil.END_SPAN_OPTIONS);
OpenTelemetryTraceUtil.endSpan(openTelemetrySpan);
span.end();
stream = null;
}
}
Expand All @@ -1239,20 +1222,12 @@ protected PartialResultSet computeNext() {
while (true) {
// Eagerly start stream before consuming any buffered items.
if (stream == null) {
OpenTelemetryTraceUtil.addEvent(
openTelemetrySpan,
"Starting/Resuming stream",
Attributes.builder()
.put("ResumeToken", resumeToken == null ? "null" : resumeToken.toStringUtf8())
.build());
span.addAnnotation(
"Starting/Resuming stream",
ImmutableMap.of(
"ResumeToken",
AttributeValue.stringAttributeValue(
resumeToken == null ? "null" : resumeToken.toStringUtf8())));
try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
"ResumeToken",
resumeToken == null ? "null" : resumeToken.toStringUtf8());

try (IScope sss = tracer.withSpan(span)) {
// When start a new stream set the Span as current to make the gRPC Span a child of
// this Span.
stream = checkNotNull(startStream(resumeToken));
Expand Down Expand Up @@ -1292,22 +1267,16 @@ protected PartialResultSet computeNext() {
}
} catch (SpannerException spannerException) {
if (safeToRetry && isRetryable(spannerException)) {
OpenTelemetryTraceUtil.addEvent(
openTelemetrySpan,
"Stream broken. Safe to retry",
OpenTelemetryTraceUtil.getExceptionAnnotations(spannerException));
span.addAnnotation(
"Stream broken. Safe to retry",
TraceUtil.getExceptionAnnotations(spannerException));

span.addAnnotation("Stream broken. Safe to retry", spannerException);
logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException);
// Truncate any items in the buffer before the last retry token.
while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {
buffer.removeLast();
}
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
try (Scope s = tracer.withSpan(span);
io.opentelemetry.context.Scope ss = openTelemetrySpan.makeCurrent()) {
try (IScope s = tracer.withSpan(span)) {
long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
backoffSleep(context, delay);
Expand All @@ -1318,17 +1287,12 @@ protected PartialResultSet computeNext() {

continue;
}
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, spannerException);
OpenTelemetryTraceUtil.addEventWithExceptionAndSetFailure(
openTelemetrySpan, "Stream broken. Safe to retry", spannerException);
span.addAnnotation("Stream broken. Not safe to retry", spannerException);
span.setStatus(spannerException);
throw spannerException;
} catch (RuntimeException e) {
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, e);
OpenTelemetryTraceUtil.addEventWithExceptionAndSetFailure(
openTelemetrySpan, "Stream broken. Safe to retry", e);

span.addAnnotation("Stream broken. Not safe to retry", e);
span.setStatus(e);
throw e;
}
}
Expand Down
Loading

0 comments on commit 050b164

Please sign in to comment.