Skip to content

Commit

Permalink
Merge branch 'main' into gfe_latency
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Jan 10, 2025
2 parents 63d88d7 + 7893f24 commit 901620b
Show file tree
Hide file tree
Showing 27 changed files with 314 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/hermetic_library_generation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
with:
fetch-depth: 0
token: ${{ secrets.CLOUD_JAVA_BOT_TOKEN }}
- uses: googleapis/sdk-platform-java/.github/[email protected].0
- uses: googleapis/sdk-platform-java/.github/[email protected].1
if: env.SHOULD_RUN == 'true'
with:
base_ref: ${{ github.base_ref }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unmanaged_dependency_check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ jobs:
# repository
.kokoro/build.sh
- name: Unmanaged dependency check
uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.41.0
uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.41.1
with:
bom-path: google-cloud-spanner-bom/pom.xml
2 changes: 1 addition & 1 deletion .kokoro/presubmit/graalvm-native-17.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Configure the docker image for kokoro-trampoline.
env_vars: {
key: "TRAMPOLINE_IMAGE"
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.41.0"
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.41.1"
}

env_vars: {
Expand Down
2 changes: 1 addition & 1 deletion .kokoro/presubmit/graalvm-native.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Configure the docker image for kokoro-trampoline.
env_vars: {
key: "TRAMPOLINE_IMAGE"
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.41.0"
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.41.1"
}

env_vars: {
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ If you are using Maven without the BOM, add this to your dependencies:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>6.81.1</version>
<version>6.84.0</version>
</dependency>

```
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>6.81.1</version>
<version>6.84.0</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
Expand Down
2 changes: 1 addition & 1 deletion generation_config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
gapic_generator_version: 2.51.0
googleapis_commitish: 52e410823122cf44d265c3beecb86c773db248a2
googleapis_commitish: 00196e2a68b16a864c57db2e870822875a7f1198
libraries_bom_version: 26.52.0
libraries:
- api_shortname: spanner
Expand Down
2 changes: 1 addition & 1 deletion google-cloud-spanner-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>sdk-platform-java-config</artifactId>
<version>3.41.0</version>
<version>3.41.1</version>
</parent>

<name>Google Cloud Spanner BOM</name>
Expand Down
2 changes: 1 addition & 1 deletion google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-monitoring-v3</artifactId>
<version>3.55.0</version>
<version>3.57.0</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

Expand All @@ -67,6 +68,7 @@
*/
abstract class AbstractReadContext
implements ReadContext, AbstractResultSet.Listener, SessionTransaction {
private static final Logger logger = Logger.getLogger(AbstractReadContext.class.getName());

abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadContext> {
private SessionImpl session;
Expand Down Expand Up @@ -797,7 +799,6 @@ CloseableIterator<PartialResultSet> startStream(
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, request.getTransaction().hasBegin());
call.request(prefetchChunks);
return stream;
}

Expand Down Expand Up @@ -952,6 +953,15 @@ ResultSet readInternalWithOptions(
} else if (defaultDirectedReadOptions != null) {
builder.setDirectedReadOptions(defaultDirectedReadOptions);
}
if (readOptions.hasLockHint()) {
if (isReadOnly()) {
logger.warning(
"Lock hint is only supported for ReadWrite transactions. "
+ "Overriding lock hint to default unspecified.");
} else {
builder.setLockHint(readOptions.lockHint());
}
}
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
Expand Down Expand Up @@ -992,7 +1002,6 @@ CloseableIterator<PartialResultSet> startStream(
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
call.request(prefetchChunks);
return stream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ interface CloseableIterator<T> extends Iterator<T> {
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
return false;
}

/** it requests the initial prefetch chunks from gRPC stream */
default void requestPrefetchChunks() {};
}

static double valueProtoToFloat64(com.google.protobuf.Value proto) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import com.google.cloud.opentelemetry.detection.DetectedPlatform;
import com.google.cloud.opentelemetry.detection.GCPPlatformDetector;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.OpenTelemetry;
Expand All @@ -53,25 +51,34 @@ final class BuiltInOpenTelemetryMetricsProvider {

public static BuiltInOpenTelemetryMetricsProvider INSTANCE =
new BuiltInOpenTelemetryMetricsProvider();

private static final Logger logger =
Logger.getLogger(BuiltInOpenTelemetryMetricsProvider.class.getName());

private final Cache<String, Map<String, String>> clientAttributesCache =
CacheBuilder.newBuilder().maximumSize(1000).build();

private static String taskId;

private OpenTelemetry openTelemetry;

private Map<String, String> clientAttributes;

private boolean isInitialized;

private BuiltInOpenTelemetryMetricsRecorder builtInOpenTelemetryMetricsRecorder;

private BuiltInOpenTelemetryMetricsProvider() {};

@VisibleForTesting
void reset() {
isInitialized = false;
}

@VisibleForTesting
void initialize(
OpenTelemetry openTelemetry,
String projectId,
String client_name,
@Nullable Credentials credentials,
@Nullable String monitoringHost) {
initialize(projectId, client_name, credentials, monitoringHost);
// Use injected opentelemetry object for testing.
this.builtInOpenTelemetryMetricsRecorder =
new BuiltInOpenTelemetryMetricsRecorder(openTelemetry, clientAttributes);
}

void initialize(
String projectId,
String client_name,
Expand All @@ -94,18 +101,6 @@ void initialize(
}
}

@VisibleForTesting
void initialize(
OpenTelemetry openTelemetry,
String projectId,
String client_name,
@Nullable Credentials credentials,
@Nullable String monitoringHost) {
initialize(projectId, client_name, credentials, monitoringHost);
this.builtInOpenTelemetryMetricsRecorder =
new BuiltInOpenTelemetryMetricsRecorder(openTelemetry, clientAttributes);
}

OpenTelemetry getOpenTelemetry() {
return this.openTelemetry;
}
Expand All @@ -118,11 +113,6 @@ BuiltInOpenTelemetryMetricsRecorder getBuiltInOpenTelemetryMetricsRecorder() {
return this.builtInOpenTelemetryMetricsRecorder;
}

@VisibleForTesting
void reset() {
isInitialized = false;
}

private Map<String, String> createClientAttributes(String projectId, String client_name) {
Map<String, String> clientAttributes = new HashMap<>();
clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.spanner;

import com.google.api.gax.core.GaxProperties;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
Expand All @@ -41,6 +40,7 @@ public class BuiltInOpenTelemetryMetricsRecorder {
* </ul>
*
* @param openTelemetry OpenTelemetry instance
* @param clientAttributes Client attributes for metrics
*/
public BuiltInOpenTelemetryMetricsRecorder(
OpenTelemetry openTelemetry, Map<String, String> clientAttributes) {
Expand Down Expand Up @@ -79,8 +79,7 @@ public void recordGFELatency(double gfeLatency, Map<String, String> attributes)
}
}

@VisibleForTesting
Attributes toOtelAttributes(Map<String, String> attributes) {
private Attributes toOtelAttributes(Map<String, String> attributes) {
Preconditions.checkNotNull(attributes, "Attributes map cannot be null");
AttributesBuilder attributesBuilder = Attributes.builder();
attributes.forEach(attributesBuilder::put);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand All @@ -39,6 +40,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
implements CloseableIterator<PartialResultSet> {
private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName());
static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();
private final int prefetchChunks;
private AsyncResultSet.StreamMessageListener streamMessageListener;

private final ConsumerImpl consumer;
Expand All @@ -60,6 +62,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
GrpcStreamIterator(
Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
this.statement = statement;
this.prefetchChunks = prefetchChunks;
this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed);
// One extra to allow for END_OF_STREAM message.
this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1);
Expand Down Expand Up @@ -102,6 +105,13 @@ public void close(@Nullable String message) {
}
}

@Override
@InternalApi
public void requestPrefetchChunks() {
Preconditions.checkState(call != null, "The StreamingCall object is not initialized");
call.request(prefetchChunks);
}

@Override
public boolean isWithBeginTransaction() {
return withBeginTransaction;
Expand Down
Loading

0 comments on commit 901620b

Please sign in to comment.