Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Micrometer and OpenTelemetry to the WebSockets Next extension #41956

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/opentelemetry-tracing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ See the main xref:opentelemetry.adoc#exporters[OpenTelemetry Guide exporters] se
** Kafka
** Pulsar
* https://quarkus.io/guides/vertx[`quarkus-vertx`] (http requests)
* xref:websockets-next-reference.adoc[`websockets-next`]


=== Disable parts of the automatic tracing
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/telemetry-micrometer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@
** Camel Messaging
* https://quarkus.io/guides/stork-reference[`quarkus-smallrye-stork`]
* https://quarkus.io/guides/vertx[`quarkus-vertx`] (http requests)
* xref:websockets-next-reference.adoc[`websockets-next`]

Check warning on line 768 in docs/src/main/asciidoc/telemetry-micrometer.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Headings] Use sentence-style capitalization in 'Configuration Reference'. Raw Output: {"message": "[Quarkus.Headings] Use sentence-style capitalization in 'Configuration Reference'.", "location": {"path": "docs/src/main/asciidoc/telemetry-micrometer.adoc", "range": {"start": {"line": 768, "column": 18}}}, "severity": "INFO"}

== Configuration Reference

Expand Down
22 changes: 22 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,9 +1056,31 @@
quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3>
----
<1> Enables traffic logging.
<2> Set the number of characters of a text message payload which will be logged.

Check warning on line 1059 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using ', which (non restrictive clause preceded by a comma)' or 'that (restrictive clause without a comma)' rather than 'which'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using ', which (non restrictive clause preceded by a comma)' or 'that (restrictive clause without a comma)' rather than 'which'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 1059, "column": 48}}}, "severity": "INFO"}
<3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`.

[[telemetry]]
== Telemetry
michalvavrik marked this conversation as resolved.
Show resolved Hide resolved

When the OpenTelemetry extension is present, traces for opened and closed WebSocket connections are collected by default.
If you do not require WebSocket traces, you can disable collecting of traces like in the example below:

[source, properties]
----
quarkus.websockets-next.server.traces.enabled=false
quarkus.websockets-next.client.traces.enabled=false
----

When the Micrometer extension is present, metrics for messages, errors and bytes transferred are collected.
If you do not require WebSocket metrics, you can disable metrics like in the example below:

[source, properties]
----
quarkus.websockets-next.server.metrics.enabled=false
quarkus.websockets-next.client.metrics.enabled=false
----

NOTE: Telemetry for the `BasicWebSocketConnector` is currently not supported.

[[websocket-next-configuration-reference]]
== Configuration reference
Expand Down
6 changes: 6 additions & 0 deletions extensions/websockets-next/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
<artifactId>mutiny-kotlin</artifactId>
<scope>test</scope>
</dependency>
<!-- Needed for InMemorySpanExporter to verify captured traces -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.websockets.next.deployment;

import java.util.function.Consumer;

import io.quarkus.builder.item.MultiBuildItem;
import io.quarkus.websockets.next.runtime.telemetry.TelemetrySupportProviderBuilder;

/**
* Provides a way to set up metrics and/or traces support in the WebSockets extension.
*/
final class TelemetrySupportBuilderCustomizerBuildItem extends MultiBuildItem {

final Consumer<TelemetrySupportProviderBuilder> builderCustomizer;

TelemetrySupportBuilderCustomizerBuildItem(Consumer<TelemetrySupportProviderBuilder> builderCustomizer) {
this.builderCustomizer = builderCustomizer;
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.quarkus.websockets.next.test.telemetry;

import static io.quarkus.websockets.next.test.utils.WSClient.ReceiverMode.BINARY;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.Set;
import java.util.stream.Collectors;

import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocketConnectOptions;

record Connection(URI uri, String[] messagesToSend, WSClient client, boolean broadcast, boolean binaryMode,
String[] expectedResponses) {

static Connection of(URI uri, boolean broadcast, boolean binaryMode, String[] sentMessages, String[] expectedResponses) {
return new Connection(uri, sentMessages, null, broadcast, binaryMode, expectedResponses);
}

static Connection of(URI uri, String expectedResponse, boolean binaryMode, String... messages) {
return new Connection(uri, messages, null, false, binaryMode, new String[] { expectedResponse });
}

private Connection with(WSClient client) {
return new Connection(uri, messagesToSend, client, broadcast, binaryMode, expectedResponses);
}

private Set<String> getReceivedMessages() {
return client.getMessages().stream().map(Buffer::toString).collect(Collectors.toSet());
}

static void sendAndAssertResponses(Vertx vertx, Connection... connections) {
openConnectionsThenSend(connections, vertx, 0);
}

private static void openConnectionsThenSend(Connection[] connections, Vertx vertx, int idx) {
var connection = connections[idx];
final WSClient client = connection.binaryMode() ? new WSClient(vertx, BINARY) : new WSClient(vertx);
try (client) {
client.connect(new WebSocketConnectOptions(), connection.uri());
connections[idx] = connection.with(client);

if (idx < connections.length - 1) {
openConnectionsThenSend(connections, vertx, idx + 1);
} else {
sendMessages(connections, connection.binaryMode());
}
}
}

private static void sendMessages(Connection[] connections, boolean binaryMode) {
for (Connection connection : connections) {
for (String message : connection.messagesToSend()) {
if (binaryMode) {
connection.client().sendAndAwait(Buffer.buffer(message));
} else {
connection.client().sendAndAwait(message);
}
}
var expectedResponses = connection.expectedResponses();
if (expectedResponses.length != 0) {
if (connection.broadcast()) {
for (Connection conn : connections) {
assertResponses(conn, expectedResponses);
}
} else {
assertResponses(connection, expectedResponses);
}
}
}
}

private static void assertResponses(Connection connection, String[] expectedResponses) {
connection.client.waitForMessages(expectedResponses.length);
Set<String> actualResponses = connection.getReceivedMessages();

for (String expectedResponse : expectedResponses) {
assertTrue(actualResponses.contains(expectedResponse),
() -> "Expected response '%s' not found, was: %s".formatted(expectedResponse, actualResponses));
}

connection.client().getMessages().clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.quarkus.websockets.next.test.telemetry;

import java.util.Arrays;

public interface ExpectedServerEndpointResponse {

String[] NO_RESPONSE = new String[] {};
EchoExpectedServerEndpointResponse ECHO_RESPONSE = new EchoExpectedServerEndpointResponse();
DoubleEchoExpectedServerEndpointResponse DOUBLE_ECHO_RESPONSE = new DoubleEchoExpectedServerEndpointResponse();

/**
* Endpoint returns void, Uni<Void> or results in exception and theefore, there is no response.
*/
final class NoExpectedServerEndpointResponse {

public String[] getExpectedResponse() {
return new String[0];
}
}

/**
* Received message is prefixed with 'echo 0: ' and returned.
*/
final class EchoExpectedServerEndpointResponse implements ExpectedServerEndpointResponse {

public String[] getExpectedResponse(String[] sentMessages) {
return Arrays.stream(sentMessages).map(msg -> "echo 0: " + msg).toArray(String[]::new);
}

}

/**
* For each received message 'msg' endpoint returns 'echo 0: msg' and 'echo 1: msg'
*/
final class DoubleEchoExpectedServerEndpointResponse implements ExpectedServerEndpointResponse {

public String[] getExpectedResponse(String[] sentMessages) {
return Arrays.stream(sentMessages)
.mapMulti((msg, consumer) -> {
consumer.accept("echo 0: " + msg);
consumer.accept("echo 1: " + msg);
})
.toArray(String[]::new);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.websockets.next.test.telemetry;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Singleton;

import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;

@ApplicationScoped
public class InMemorySpanExporterProducer {

@Produces
@Singleton
InMemorySpanExporter inMemorySpanExporter() {
return InMemorySpanExporter.create();
}

}
Loading