diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index 917f5c99e8..ba12c09101 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -71,6 +71,7 @@ abstract class ResumableStreamIterator extends AbstractIterator stream; private ByteString resumeToken; private boolean finished; + private final Object monitor = new Object(); /** * Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have * reached the maximum buffer size without seeing a restart token; in this case, we will drain the @@ -318,17 +319,19 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) { } private void startGrpcStreaming() { - System.out.println("Starting Stream " + stream + " " + streamMessageListener); - if (stream == null) { - span.addAnnotation( - "Starting/Resuming stream", - "ResumeToken", - resumeToken == null ? "null" : resumeToken.toStringUtf8()); - try (IScope scope = tracer.withSpan(span)) { - // When start a new stream set the Span as current to make the gRPC Span a child of - // this Span. - System.out.println("Creating Stream " + stream + " " + streamMessageListener); - stream = checkNotNull(startStream(resumeToken, streamMessageListener)); + synchronized (monitor) { + System.out.println("Starting Stream " + stream + " " + streamMessageListener); + if (stream == null) { + span.addAnnotation( + "Starting/Resuming stream", + "ResumeToken", + resumeToken == null ? "null" : resumeToken.toStringUtf8()); + try (IScope scope = tracer.withSpan(span)) { + // When start a new stream set the Span as current to make the gRPC Span a child of + // this Span. + System.out.println("Creating Stream " + stream + " " + streamMessageListener); + stream = checkNotNull(startStream(resumeToken, streamMessageListener)); + } } } }