diff --git a/runtime/binding-mqtt/pom.xml b/runtime/binding-mqtt/pom.xml index b349482243..92759b3de4 100644 --- a/runtime/binding-mqtt/pom.xml +++ b/runtime/binding-mqtt/pom.xml @@ -26,7 +26,7 @@ 11 11 - 0.69 + 0.88 3 diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttEventContext.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttEventContext.java new file mode 100644 index 0000000000..89ad74c645 --- /dev/null +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttEventContext.java @@ -0,0 +1,80 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.internal; + +import static io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.MqttEventType.CLIENT_CONNECTED; + +import java.nio.ByteBuffer; +import java.time.Clock; + +import org.agrona.concurrent.AtomicBuffer; +import org.agrona.concurrent.UnsafeBuffer; + +import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.EventFW; +import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.MqttEventExFW; +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer; +import io.aklivity.zilla.runtime.engine.guard.GuardHandler; + +public class MqttEventContext +{ + private static final int EVENT_BUFFER_CAPACITY = 2048; + + private final AtomicBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY)); + private final AtomicBuffer extensionBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY)); + private final EventFW.Builder eventRW = new EventFW.Builder(); + private final MqttEventExFW.Builder mqttEventExRW = new MqttEventExFW.Builder(); + private final int mqttTypeId; + private final int clientConnectedEventId; + private final MessageConsumer eventWriter; + private final Clock clock; + + public MqttEventContext( + EngineContext context) + { + this.mqttTypeId = context.supplyTypeId(MqttBinding.NAME); + this.clientConnectedEventId = context.supplyEventId("binding.mqtt.client.connected"); + this.eventWriter = context.supplyEventWriter(); + this.clock = context.clock(); + } + + public void onClientConnected( + long traceId, + long bindingId, + GuardHandler guard, + long authorization, + String clientId) + { + String identity = guard == null ? null : guard.identity(authorization); + MqttEventExFW extension = mqttEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .clientConnected(e -> e + .typeId(CLIENT_CONNECTED.value()) + .identity(identity) + .clientId(clientId) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(clientConnectedEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(mqttTypeId, event.buffer(), event.offset(), event.limit()); + } +} diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttEventFormatter.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttEventFormatter.java new file mode 100644 index 0000000000..ba102af34c --- /dev/null +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttEventFormatter.java @@ -0,0 +1,73 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.internal; + +import org.agrona.DirectBuffer; + +import io.aklivity.zilla.runtime.binding.mqtt.internal.types.StringFW; +import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.EventFW; +import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.MqttClientConnectedExFW; +import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.MqttEventExFW; +import io.aklivity.zilla.runtime.engine.Configuration; +import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi; + +public final class MqttEventFormatter implements EventFormatterSpi +{ + private static final String CLIENT_CONNECTED_FORMAT = "CLIENT_CONNECTED %s %s"; + + private final EventFW eventRO = new EventFW(); + private final MqttEventExFW mqttEventExRO = new MqttEventExFW(); + + MqttEventFormatter( + Configuration config) + { + } + + public String format( + DirectBuffer buffer, + int index, + int length) + { + final EventFW event = eventRO.wrap(buffer, index, index + length); + final MqttEventExFW extension = mqttEventExRO + .wrap(event.extension().buffer(), event.extension().offset(), event.extension().limit()); + String result = null; + switch (extension.kind()) + { + case CLIENT_CONNECTED: + { + MqttClientConnectedExFW ex = extension.clientConnected(); + result = String.format(CLIENT_CONNECTED_FORMAT, identity(ex.identity()), asString(ex.clientId())); + break; + } + } + return result; + } + + private static String asString( + StringFW stringFW) + { + String s = stringFW.asString(); + return s == null ? "" : s; + } + + private static String identity( + StringFW identity) + { + int length = identity.length(); + return length <= 0 ? "-" : identity.asString(); + } +} diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttEventFormatterFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttEventFormatterFactory.java new file mode 100644 index 0000000000..beab5092cc --- /dev/null +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttEventFormatterFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.internal; + +import io.aklivity.zilla.runtime.engine.Configuration; +import io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi; + +public final class MqttEventFormatterFactory implements EventFormatterFactorySpi +{ + @Override + public MqttEventFormatter create( + Configuration config) + { + return new MqttEventFormatter(config); + } + + @Override + public String type() + { + return MqttBinding.NAME; + } +} diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index ff9c5a612f..c2fe91c9a2 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -112,6 +112,7 @@ import io.aklivity.zilla.runtime.binding.mqtt.config.MqttPatternConfig.MqttConnectProperty; import io.aklivity.zilla.runtime.binding.mqtt.internal.MqttBinding; import io.aklivity.zilla.runtime.binding.mqtt.internal.MqttConfiguration; +import io.aklivity.zilla.runtime.binding.mqtt.internal.MqttEventContext; import io.aklivity.zilla.runtime.binding.mqtt.internal.MqttValidator; import io.aklivity.zilla.runtime.binding.mqtt.internal.config.MqttBindingConfig; import io.aklivity.zilla.runtime.binding.mqtt.internal.config.MqttRouteConfig; @@ -480,6 +481,7 @@ public final class MqttServerFactory implements MqttStreamFactory private final MqttValidator validator; private final CharsetDecoder utf8Decoder; private final Function supplyValidator; + private final MqttEventContext events; private MqttQoS publishQosMax; @@ -529,6 +531,7 @@ public MqttServerFactory( this.decodePacketTypeByVersion.put(MQTT_PROTOCOL_VERSION_4, this::decodePacketTypeV4); this.decodePacketTypeByVersion.put(MQTT_PROTOCOL_VERSION_5, this::decodePacketTypeV5); this.supplyValidator = context::supplyValidator; + this.events = new MqttEventContext(context); } @Override @@ -4447,6 +4450,10 @@ private void doEncodeConnack( String16FW reason, int version) { + if (reasonCode == SUCCESS) + { + events.onClientConnected(traceId, routedId, guard, authorization, clientId.asString()); + } switch (version) { diff --git a/runtime/binding-mqtt/src/main/moditect/module-info.java b/runtime/binding-mqtt/src/main/moditect/module-info.java index 4cf09438f0..06d1e06d0b 100644 --- a/runtime/binding-mqtt/src/main/moditect/module-info.java +++ b/runtime/binding-mqtt/src/main/moditect/module-info.java @@ -27,4 +27,7 @@ provides io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi with io.aklivity.zilla.runtime.binding.mqtt.internal.config.MqttConditionConfigAdapter; + + provides io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi + with io.aklivity.zilla.runtime.binding.mqtt.internal.MqttEventFormatterFactory; } diff --git a/runtime/binding-mqtt/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi b/runtime/binding-mqtt/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi new file mode 100644 index 0000000000..574bd0dd80 --- /dev/null +++ b/runtime/binding-mqtt/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi @@ -0,0 +1 @@ +io.aklivity.zilla.runtime.binding.mqtt.internal.MqttEventFormatterFactory diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/ConnectionIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/ConnectionIT.java index 6db9382c02..3fc6db15b0 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/ConnectionIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/ConnectionIT.java @@ -68,6 +68,16 @@ public void shouldConnect() throws Exception k3po.finish(); } + @Test + @Configuration("server.log.event.yaml") + @Specification({ + "${net}/connect.successful/client", + "${app}/session.connect/server"}) + public void shouldConnectAndLog() throws Exception + { + k3po.finish(); + } + @Test @Configuration("server.credentials.username.yaml") @Specification({ diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/ConnectionIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/ConnectionIT.java index 8954fce2da..7d22e40ed3 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/ConnectionIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/ConnectionIT.java @@ -66,6 +66,16 @@ public void shouldConnect() throws Exception k3po.finish(); } + @Test + @Configuration("server.log.event.yaml") + @Specification({ + "${net}/connect.successful/client", + "${app}/session.connect/server"}) + public void shouldConnectAndLog() throws Exception + { + k3po.finish(); + } + @Test @Configuration("server.credentials.username.yaml") @Specification({ diff --git a/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl b/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl index 9431a6b159..b9889d621d 100644 --- a/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl +++ b/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl @@ -261,4 +261,23 @@ scope mqtt INCOMPLETE(1) } } + + scope event + { + enum MqttEventType (uint8) + { + CLIENT_CONNECTED (1) + } + + struct MqttClientConnectedEx extends core::stream::Extension + { + string8 identity; + string8 clientId; + } + + union MqttEventEx switch (MqttEventType) + { + case CLIENT_CONNECTED: MqttClientConnectedEx clientConnected; + } + } } diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/config/server.log.event.yaml b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/config/server.log.event.yaml new file mode 100644 index 0000000000..53d5edc154 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/config/server.log.event.yaml @@ -0,0 +1,33 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: test.net0 + id: binding.mqtt.client.connected + message: CLIENT_CONNECTED - client +bindings: + net0: + type: mqtt + kind: server + routes: + - exit: app0