Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT clients access log implementation #1023

Merged
merged 2 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runtime/binding-mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.69</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.88</jacoco.coverage.ratio>
<jacoco.missed.count>3</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.mqtt.internal;

import static io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.MqttEventType.CLIENT_CONNECTED;

import java.nio.ByteBuffer;
import java.time.Clock;

import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.MqttEventExFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;
import io.aklivity.zilla.runtime.engine.guard.GuardHandler;

public class MqttEventContext
{
private static final int EVENT_BUFFER_CAPACITY = 2048;

private final AtomicBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final AtomicBuffer extensionBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final EventFW.Builder eventRW = new EventFW.Builder();
private final MqttEventExFW.Builder mqttEventExRW = new MqttEventExFW.Builder();
private final int mqttTypeId;
private final int clientConnectedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public MqttEventContext(
EngineContext context)
{
this.mqttTypeId = context.supplyTypeId(MqttBinding.NAME);
this.clientConnectedEventId = context.supplyEventId("binding.mqtt.client.connected");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}

public void onClientConnected(
long traceId,
long bindingId,
GuardHandler guard,
long authorization,
String clientId)
{
String identity = guard == null ? null : guard.identity(authorization);
MqttEventExFW extension = mqttEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.clientConnected(e -> e
.typeId(CLIENT_CONNECTED.value())
.identity(identity)
.clientId(clientId)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(clientConnectedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(mqttTypeId, event.buffer(), event.offset(), event.limit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.mqtt.internal;

import org.agrona.DirectBuffer;

import io.aklivity.zilla.runtime.binding.mqtt.internal.types.StringFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.MqttClientConnectedExFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.event.MqttEventExFW;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;

public final class MqttEventFormatter implements EventFormatterSpi
{
private static final String CLIENT_CONNECTED_FORMAT = "CLIENT_CONNECTED %s %s";

private final EventFW eventRO = new EventFW();
private final MqttEventExFW mqttEventExRO = new MqttEventExFW();

MqttEventFormatter(
Configuration config)
{
}

public String format(
DirectBuffer buffer,
int index,
int length)
{
final EventFW event = eventRO.wrap(buffer, index, index + length);
final MqttEventExFW extension = mqttEventExRO
.wrap(event.extension().buffer(), event.extension().offset(), event.extension().limit());
String result = null;
switch (extension.kind())
{
case CLIENT_CONNECTED:
{
MqttClientConnectedExFW ex = extension.clientConnected();
result = String.format(CLIENT_CONNECTED_FORMAT, identity(ex.identity()), asString(ex.clientId()));
break;
}
}
return result;
}

private static String asString(
StringFW stringFW)
{
String s = stringFW.asString();
return s == null ? "" : s;
}

private static String identity(
StringFW identity)
{
int length = identity.length();
return length <= 0 ? "-" : identity.asString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.mqtt.internal;

import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi;

public final class MqttEventFormatterFactory implements EventFormatterFactorySpi
{
@Override
public MqttEventFormatter create(
Configuration config)
{
return new MqttEventFormatter(config);
}

@Override
public String type()
{
return MqttBinding.NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import io.aklivity.zilla.runtime.binding.mqtt.config.MqttPatternConfig.MqttConnectProperty;
import io.aklivity.zilla.runtime.binding.mqtt.internal.MqttBinding;
import io.aklivity.zilla.runtime.binding.mqtt.internal.MqttConfiguration;
import io.aklivity.zilla.runtime.binding.mqtt.internal.MqttEventContext;
import io.aklivity.zilla.runtime.binding.mqtt.internal.MqttValidator;
import io.aklivity.zilla.runtime.binding.mqtt.internal.config.MqttBindingConfig;
import io.aklivity.zilla.runtime.binding.mqtt.internal.config.MqttRouteConfig;
Expand Down Expand Up @@ -480,6 +481,7 @@ public final class MqttServerFactory implements MqttStreamFactory
private final MqttValidator validator;
private final CharsetDecoder utf8Decoder;
private final Function<ModelConfig, ValidatorHandler> supplyValidator;
private final MqttEventContext events;

private MqttQoS publishQosMax;

Expand Down Expand Up @@ -529,6 +531,7 @@ public MqttServerFactory(
this.decodePacketTypeByVersion.put(MQTT_PROTOCOL_VERSION_4, this::decodePacketTypeV4);
this.decodePacketTypeByVersion.put(MQTT_PROTOCOL_VERSION_5, this::decodePacketTypeV5);
this.supplyValidator = context::supplyValidator;
this.events = new MqttEventContext(context);
}

@Override
Expand Down Expand Up @@ -4447,6 +4450,10 @@ private void doEncodeConnack(
String16FW reason,
int version)
{
if (reasonCode == SUCCESS)
{
events.onClientConnected(traceId, routedId, guard, authorization, clientId.asString());
}

switch (version)
{
Expand Down
3 changes: 3 additions & 0 deletions runtime/binding-mqtt/src/main/moditect/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@

provides io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi
with io.aklivity.zilla.runtime.binding.mqtt.internal.config.MqttConditionConfigAdapter;

provides io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi
with io.aklivity.zilla.runtime.binding.mqtt.internal.MqttEventFormatterFactory;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.aklivity.zilla.runtime.binding.mqtt.internal.MqttEventFormatterFactory
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public void shouldConnect() throws Exception
k3po.finish();
}

@Test
@Configuration("server.log.event.yaml")
@Specification({
"${net}/connect.successful/client",
"${app}/session.connect/server"})
public void shouldConnectAndLog() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.credentials.username.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ public void shouldConnect() throws Exception
k3po.finish();
}

@Test
@Configuration("server.log.event.yaml")
@Specification({
"${net}/connect.successful/client",
"${app}/session.connect/server"})
public void shouldConnectAndLog() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.credentials.username.yaml")
@Specification({
Expand Down
19 changes: 19 additions & 0 deletions specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,23 @@ scope mqtt
INCOMPLETE(1)
}
}

scope event
{
enum MqttEventType (uint8)
{
CLIENT_CONNECTED (1)
}

struct MqttClientConnectedEx extends core::stream::Extension
{
string8 identity;
string8 clientId;
}

union MqttEventEx switch (MqttEventType)
{
case CLIENT_CONNECTED: MqttClientConnectedEx clientConnected;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright 2021-2023 Aklivity Inc.
#
# Aklivity licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

---
name: test
telemetry:
exporters:
exporter0:
type: test
options:
events:
- qname: test.net0
id: binding.mqtt.client.connected
message: CLIENT_CONNECTED - client
bindings:
net0:
type: mqtt
kind: server
routes:
- exit: app0