Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support OpenTelemetry metrics #1465

8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<excludedIntegrationTests/>

<junixsocket.version>2.9.0</junixsocket.version>
<opentelemetry.exporter.version>0.27.0</opentelemetry.exporter.version>
hengfengli marked this conversation as resolved.
Show resolved Hide resolved
</properties>

<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -141,7 +142,12 @@
<dependency>
<groupId>com.google.cloud.opentelemetry</groupId>
<artifactId>exporter-trace</artifactId>
<version>0.23.0</version>
<version>${opentelemetry.exporter.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.opentelemetry</groupId>
<artifactId>exporter-metrics</artifactId>
<version>${opentelemetry.exporter.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/google/cloud/spanner/pgadapter/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.google.cloud.spanner.pgadapter;

import com.google.auth.Credentials;
import com.google.cloud.opentelemetry.metric.GoogleCloudMetricExporter;
import com.google.cloud.opentelemetry.metric.MetricConfiguration;
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
Expand All @@ -23,6 +25,8 @@
import com.google.devtools.cloudtrace.v2.TruncatableString;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
Expand Down Expand Up @@ -104,12 +108,24 @@ static OpenTelemetry setupOpenTelemetry(OptionsMetadata optionsMetadata) {
Sampler.parentBased(
Sampler.traceIdRatioBased(optionsMetadata.getOpenTelemetryTraceRatio()));
}
MetricExporter cloudMonitoringExporter =
GoogleCloudMetricExporter.createWithConfiguration(
MetricConfiguration.builder()
// Configure the cloud project id.
.setProjectId(projectId)
// Set the credentials to use when writing to the Cloud Monitoring API
.setCredentials(credentials)
.build());
return AutoConfiguredOpenTelemetrySdk.builder()
.addTracerProviderCustomizer(
(sdkTracerProviderBuilder, configProperties) ->
sdkTracerProviderBuilder
.setSampler(sampler)
.addSpanProcessor(BatchSpanProcessor.builder(traceExporter).build()))
.addMeterProviderCustomizer(
(sdkMeterProviderBuilder, configProperties) ->
sdkMeterProviderBuilder.registerMetricReader(
PeriodicMetricReader.builder(cloudMonitoringExporter).build()))
.build()
.getOpenTelemetrySdk();
} catch (IOException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import com.google.cloud.spanner.pgadapter.utils.CopyDataReceiver;
import com.google.cloud.spanner.pgadapter.utils.Logging;
import com.google.cloud.spanner.pgadapter.utils.Logging.Action;
import com.google.cloud.spanner.pgadapter.utils.Metrics;
import com.google.cloud.spanner.pgadapter.utils.MutationWriter;
import com.google.cloud.spanner.pgadapter.wireoutput.ReadyResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.ReadyResponse.Status;
Expand All @@ -80,6 +81,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.StatusCode;
Expand Down Expand Up @@ -123,6 +125,8 @@ public class BackendConnection {

private final Tracer tracer;

private final Metrics meter;

private final Deque<Context> statementContext = new ConcurrentLinkedDeque<>();

private final String connectionId;
Expand Down Expand Up @@ -250,25 +254,29 @@ private final class Execute extends BufferedStatement<StatementResult> {
private final String command;
private final Function<Statement, Statement> statementBinder;
private final boolean analyze;
private final Attributes metricAttributes;

Execute(
String command,
ParsedStatement parsedStatement,
Statement statement,
Function<Statement, Statement> statementBinder) {
this(command, parsedStatement, statement, statementBinder, false);
Function<Statement, Statement> statementBinder,
DatabaseId databaseId) {
this(command, parsedStatement, statement, statementBinder, databaseId, false);
}

Execute(
String command,
ParsedStatement parsedStatement,
Statement statement,
Function<Statement, Statement> statementBinder,
DatabaseId databaseId,
boolean analyze) {
super(parsedStatement, statement);
this.command = command;
this.statementBinder = statementBinder;
this.analyze = analyze;
this.metricAttributes = ExtendedQueryProtocolHandler.getMetricAttributes(databaseId);
}

@Override
Expand Down Expand Up @@ -472,6 +480,7 @@ private StatementResult executeOnSpannerWithLogging(Statement statement) {
Stopwatch stopwatch = Stopwatch.createStarted();
StatementResult result = spannerConnection.execute(statement);
Duration executionDuration = stopwatch.elapsed();
meter.recordClientLibLatency(executionDuration.toMillis(), metricAttributes);
logger.log(
Level.FINER,
Logging.format(
Expand Down Expand Up @@ -838,6 +847,7 @@ void doExecute() {
/** Creates a PG backend connection that uses the given Spanner {@link Connection} and options. */
BackendConnection(
Tracer tracer,
Metrics meter,
String connectionId,
Runnable closeAllPortals,
DatabaseId databaseId,
Expand All @@ -846,6 +856,7 @@ void doExecute() {
OptionsMetadata optionsMetadata,
Supplier<ImmutableList<LocalStatement>> localStatements) {
this.tracer = tracer;
this.meter = meter;
this.connectionId = connectionId;
this.closeAllPortals = closeAllPortals;
this.sessionState = new SessionState(optionsMetadata);
Expand Down Expand Up @@ -945,14 +956,15 @@ public Future<StatementResult> execute(
ParsedStatement parsedStatement,
Statement statement,
Function<Statement, Statement> statementBinder) {
Execute execute = new Execute(command, parsedStatement, statement, statementBinder);
Execute execute = new Execute(command, parsedStatement, statement, statementBinder, databaseId);
bufferedStatements.add(execute);
return execute.result;
}

public ListenableFuture<StatementResult> analyze(
String command, ParsedStatement parsedStatement, Statement statement) {
Execute execute = new Execute(command, parsedStatement, statement, Function.identity(), true);
Execute execute =
new Execute(command, parsedStatement, statement, Function.identity(), databaseId, true);
bufferedStatements.add(execute);
return execute.result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

import static com.google.cloud.spanner.pgadapter.Server.getVersion;

import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.pgadapter.ConnectionHandler;
import com.google.cloud.spanner.pgadapter.error.PGExceptionFactory;
import com.google.cloud.spanner.pgadapter.utils.Logging;
import com.google.cloud.spanner.pgadapter.utils.Logging.Action;
import com.google.cloud.spanner.pgadapter.utils.Metrics;
import com.google.cloud.spanner.pgadapter.wireoutput.ReadyResponse;
import com.google.cloud.spanner.pgadapter.wireprotocol.AbstractQueryProtocolMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.SyncMessage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
Expand All @@ -51,8 +55,11 @@ public class ExtendedQueryProtocolHandler {

private final String connectionId;
private final Tracer tracer;
private final Metrics meter;
private volatile Span span;
private volatile Scope scope;
private volatile Stopwatch stopwatch;
private final Attributes metricAttributes;

/** Creates an {@link ExtendedQueryProtocolHandler} for the given connection. */
public ExtendedQueryProtocolHandler(ConnectionHandler connectionHandler) {
Expand All @@ -63,9 +70,12 @@ public ExtendedQueryProtocolHandler(ConnectionHandler connectionHandler) {
.getServer()
.getOpenTelemetry()
.getTracer(ExtendedQueryProtocolHandler.class.getName(), getVersion());
this.meter = new Metrics(connectionHandler.getServer().getOpenTelemetry());
this.metricAttributes = getMetricAttributes(connectionHandler.getDatabaseId());
this.backendConnection =
new BackendConnection(
tracer,
meter,
connectionHandler.getTraceConnectionId().toString(),
connectionHandler::closeAllPortals,
connectionHandler.getDatabaseId(),
Expand All @@ -86,6 +96,8 @@ public ExtendedQueryProtocolHandler(
.getServer()
.getOpenTelemetry()
.getTracer(ExtendedQueryProtocolHandler.class.getName(), getVersion());
this.meter = new Metrics(connectionHandler.getServer().getOpenTelemetry());
this.metricAttributes = getMetricAttributes(connectionHandler.getDatabaseId());
this.backendConnection = Preconditions.checkNotNull(backendConnection);
}

Expand All @@ -98,6 +110,16 @@ public BackendConnection getBackendConnection() {
return backendConnection;
}

public static Attributes getMetricAttributes(DatabaseId databaseId) {
AttributesBuilder attributesBuilder = Attributes.builder();
if (databaseId != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

databaseId should never be null (at least not for a real connection, maybe in some tests it is). Could we therefore change this into a more direct implementation, where:

  1. metricAttributes are always created when ExtendedQueryProtocolHandler is created.
  2. This method is not static, but an instance method, and always just returns the actual metricAttributes of this ExtendedQueryProtocolHandler. It should also not take DatabaseId (or anything else) as an input argument, as the DatabaseId is fixed for an ExtendedQueryProtocolHandler. The structure here is that:
  3. Each connection from a client to PGAdapter has one ConnectionHandler.
  4. There is a one-to-one relationship between ConnectionHandler and ExtendedQueryProtocolHandler.
  5. There is a one-to-one relationship between ExtendedQueryProtocolHandler and BackendConnection.
  6. All of the above classes also have exactly one DatabaseId that never changes during their lifetime.

Now it seems like metricAttributes sometimes needs to be created on-the-fly, but I don't think that is ever true.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was originally creating metricAttributes in ExtendedQueryProtocolHandler and pass it to the BackendConnection. But I thought we already pass databaseId into it in here. I need to construct the attributes in Execute of BackendConnection.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think databaseId should not be null but I add this check due to an error. If I add metricAttributes as an instance method, can the Execute of BackendConnection access it? Let me explore it more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look my latest changes. Basically, I need the attributes in Execute of BackendConnection.

attributesBuilder.put("database", databaseId.getDatabase());
attributesBuilder.put("instance_id", databaseId.getInstanceId().getInstance());
attributesBuilder.put("project_id", databaseId.getInstanceId().getProject());
}
return attributesBuilder.build();
}

/** Returns a copy of the currently buffered messages in this handler. */
@VisibleForTesting
List<AbstractQueryProtocolMessage> getMessages() {
Expand All @@ -115,6 +137,7 @@ boolean isExtendedProtocol() {

public void maybeStartSpan(boolean isExtendedProtocol) {
if (span == null) {
stopwatch = Stopwatch.createStarted();
span =
tracer
.spanBuilder("query_protocol_handler")
Expand Down Expand Up @@ -244,6 +267,7 @@ private void endSpan() {
scope.close();
span.end();
span = null;
meter.recordPGAdapterLatency(stopwatch.elapsed().toMillis(), metricAttributes);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.cloud.spanner.pgadapter.utils;

import com.google.api.core.InternalApi;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import java.util.Arrays;
import java.util.List;

@InternalApi
public class Metrics {
static final String INSTRUMENTATION_SCOPE = "cloud.google.com/java";
static final String SPANNER_CLIENT_LIB_LATENCY = "spanner/pgadapter/client_lib_latencies";
static final String SPANNER_CLIENT_LIB_LATENCY_DESCRIPTION =
"Latency when the client library receives a call and returns a response";
static final String PGADAPTER_LATENCY = "spanner/pgadapter/roundtrip_latencies";
static final String PGADAPTER_LATENCY_DESCRIPTION =
"Latency when the JDBC makes a call and gets a response, measured in PGAdapter";
hengfengli marked this conversation as resolved.
Show resolved Hide resolved

private final LongHistogram spannerClientLibLatencies;
private final LongHistogram pgadapterLatencies;

public Metrics(OpenTelemetry openTelemetry) {
Meter meter = openTelemetry.getMeter(INSTRUMENTATION_SCOPE);
List<Long> RPC_MILLIS_BUCKET_BOUNDARIES =
Arrays.asList(
1L, 2L, 3L, 4L, 5L, 6L, 8L, 10L, 13L, 16L, 20L, 25L, 30L, 40L, 50L, 65L, 80L, 100L,
130L, 160L, 200L, 250L, 300L, 400L, 500L, 650L, 800L, 1000L, 2000L, 5000L, 10000L,
20000L, 50000L, 100000L);
spannerClientLibLatencies =
meter
.histogramBuilder(SPANNER_CLIENT_LIB_LATENCY)
.ofLongs()
.setDescription(SPANNER_CLIENT_LIB_LATENCY_DESCRIPTION)
.setUnit("ms")
.setExplicitBucketBoundariesAdvice(RPC_MILLIS_BUCKET_BOUNDARIES)
.build();
pgadapterLatencies =
meter
.histogramBuilder(PGADAPTER_LATENCY)
.ofLongs()
.setDescription(PGADAPTER_LATENCY_DESCRIPTION)
.setUnit("ms")
.setExplicitBucketBoundariesAdvice(RPC_MILLIS_BUCKET_BOUNDARIES)
.build();
}

@InternalApi
public void recordClientLibLatency(long value, Attributes attributes) {
if (spannerClientLibLatencies != null) {
spannerClientLibLatencies.record(value, attributes);
}
}

@InternalApi
public void recordPGAdapterLatency(long value, Attributes attributes) {
if (pgadapterLatencies != null) {
hengfengli marked this conversation as resolved.
Show resolved Hide resolved
pgadapterLatencies.record(value, attributes);
}
}
}
Loading
Loading