diff --git a/benchmark-tool/README.md b/benchmark-tool/README.md new file mode 100644 index 0000000000..d7aa9069c8 --- /dev/null +++ b/benchmark-tool/README.md @@ -0,0 +1,367 @@ +The benchmark-tool is used to run benchmark tests on ditto. + +It is implemented using the load-testing tool [k6](https://github.com/grafana/k6) with the [xk6-kafka extension](https://github.com/mostafa/xk6-kafka). + +[MMock](https://github.com/jmartin82/mmock) is used to mock http responses, it exposes 2 endpoints that are configurable via mmock/default.yaml and mmock/live_messages.yaml. Default values are /:thingId and /live_messages and they are used to publish [modified twin events](https://eclipse.dev/ditto/protocol-specification-things-create-or-modify.html#event-1) and [device live messages](https://eclipse.dev/ditto/protocol-twinlive.html#live) sent via HTTP POST request. + +###### The benchmark test consists of 4 tests available to run, called scenarios: + +- **READ_THINGS** - read things via HTTP ( get things by id ) + +- **SEARCH_THINGS** - search things via HTTP ( get things by applying search filter ) + +- **MODIFY_THINGS** - Modify things by sending ditto protocol [modify messages](https://eclipse.dev/ditto/protocol-specification-things-create-or-modify.html#create-or-modify-a-thing) to specfic kafka topic. Ditto kafka connection is reading from this topic and creates [modify commands](https://eclipse.dev/ditto/basic-signals-command.html#modify-commands). [Ditto HTTP push connection](https://eclipse.dev/ditto/connectivity-protocol-bindings-http.html) is configured in ditto, which forwards the [modified twin events](https://eclipse.dev/ditto/protocol-specification-things-create-or-modify.html#event-1) from topic **/things/twin/events?filter=eq(topic:action,'modified')** to a monster mock endpoint, which replies with HTTP status code 204. + +- **DEVICE_LIVE_MESSAGES** - Send [live messages](https://eclipse.dev/ditto/protocol-twinlive.html#live) to things via HTTP. [Ditto HTTP push connection](https://eclipse.dev/ditto/connectivity-protocol-bindings-http.html) connection is configured in ditto, which sends events from topic **/things/live/messages** to a monster mock endpoint, which replies with predefined ditto protocol message. + +Also, there is a special scenario called **WARMUP**, which is used to warmup the system, by executing a single read request for each thing, which will cause them to get cached. + +# Getting started: + +## K6 is configurable via environment variables and the following must be set, in order to run the test(sample variables in [test-local.env](https://github.boschdevcloud.com/bosch-iot-things/ditto/blob/master/benchmark-tool/test-local.env) file): + +## K6 test related + +| Name | Description | +| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | +| SETUP_TIMEOUT | Specify how long the k6 setup() function is allow to run before it's terminated | +| TEARDOWN_TIMEOUT | Specify how long the k6 teardown() function is allowed to run before it's terminated | +| THINGS_NAMESPACE | Namepsace to use for created ditto things | +| THINGS_ID_TEMPLATE | Things id template, f.e. 'my-thing-{0}' | +| THINGS_START_INDEX | Things start index, used in template, f.e.  if start index is 1, created things have name of: 'my-thing-1', 'my-thing-2', etc. | +| CREATE_THINGS_BATCH_SIZE | Creating things is done via implicitThingCreationMapper via kafka connection. The kafka messages are sent in batches and this variable sets the batch size. | +| CREATE_THINGS | If the test should create things, before executing the scenarios (0/1) | +| DELETE_THINGS | If the test should delete the things, after executing the scenarios (0/1) | +| KAFKA_PRODUCER_LOGGER_ENABLED | K6 kafka producer logger enabled (0/1) | +| KAFKA_CONSUMER_LOGGER_ENABLED | K6 kafka consumer logger enabled (0/1) | +| CREATE_DITTO_CONNECTIONS | If the test should create the needed for scenarios ditto connections, before executing the scenarios | +| DELETE_DITTO_CONNECTIONS | If the test should delete the needed for scenarios ditto connections, after executing the scenarios | +| SCENARIOS_TO_RUN | Array of scenarios names that should run, available options is: WARMUP, DEVICE_LIVE_MESSAGES, SERACH_THINGS, READ_THINGS, MODIFY_THINGS | +| LOG_REMAINING | Log the remaining things that need to be created. Useful for debugging purposes | +| BATCH_SIZE | Max number of simultaneous connections of a k6 http.batch() call, which is used for warming up things | + +## Ditto related + +| Name | Description | +| ------------------------------- | ---------------------------------- | +| DITO_API_URI | Ditto api url | +| DITTO_AUTH_CONTEXT_HEADER | Authorization context header name | +| DITTO_AUTH_CONTEXT_HEADER_VALUE | Authorization context header value | + +## Kafka related + +| Name | Description | +| ----------------------- | -------------------------------- | +| KAFKA_BOOTSTRAP_SERVERS | Array of kafka bootstrap servers | + +## Scenarios related + +###### WARMUP + +| Name | Description | +| ------------------- | -------------------------------------------------------------------------------------------- | +| WARMUP_MAX_DURATION | The maximum duration of warmup scenario. After, the scenario will be forcefully stopped | +| WARMUP_START_TIME | Time offset since the start of the test, at which point this scenario should begin execution | +| WARMUP_VUS | An integer value specifying the number of VUs to run concurrently | + +###### Every other scenario has the same config variables, created by suffixing the variable name with the name of the scenario, f.e. SEARCH_THINGS_DURATION + +| Name | Description | +| ------------------ | -------------------------------------------------------------------------------------------------- | +| _DURATION | Total scenario duration. | +| _PER_SECOND | Number of requests to execute per second. For kafka scenarios number of kafka messages per second. | +| _PRE_ALLOCATED_VUS | Number of VUs to pre-allocate before scenario start to preserve runtime resources. | +| _MAX_VUS | Maximum number of VUs to allow during the scenario run. | +| _START_TIME | Time offset since the start of the test, at which point this scenario should begin execution. | + +###### Ditto kafka connections + +| Name | Description | +| ------------------------------------------- | ----------------------------------------------------------------------------- | +| KAFKA_CONNECTION_QOS | Ditto kafka connection qos value (0/1) | +| KAFKA_CONNECTION_CUSTOM_ACK | Ditto kafka connection custom acknowledge name | +| KAFKA_SOURCE_CONNECTION_CLIENT_COUNT | Ditto source (consumer) kafka connection client count | +| KAFKA_TARGET_CONNECTION_CLIENT_COUNT | Ditto target (producer) kafka connection client count | +| CREATE_UPDATE_THING_SOURCE_TOPIC | Kafka topic for sending create/update messages to ditto source connection | +| CREATE_UPDATE_THING_SOURCE_TOPIC_PARTITIONS | Number of partitions for the create/update kafka topic | +| CREATE_UPDATE_THING_REPLY_TOPIC | Kafka topic for ditto target connection, replying with 'thing created' events | +| CREATE_UPDATE_THING_REPLY_TOPIC_PARTITIONS | Number of partitions for the create/update reply kafka topic | +| CREATE_UPDATE_THING_CONSUMER_GROUP_ID | K6 kafka consumer group id | + +###### Ditto HTTP Push connection + +| Name | Description | +| ---------------------------------- | ------------------------------------------------------------------------- | +| HTTP_PUSH_CONNECTION_CLIENT_COUNT | Ditto HTTP push connection client count | +| HTTP_PUSH_CONNECTION_PARALLELISM | Ditto HTTP push connection parallelism | +| PUSH_ENDPOINT_URI | Ditto HTTP push connection endpoint uri | +| PUSH_ENDPOINT_EVENTS_MODIFIED_PATH | Ditto HTTP push connection target address for thing modified event | +| PUSH_ENDPOINT_LIVE_MESSAGE_PATH | Ditto HTTP push connection target address for things live messages events | + +## Test lifecycle + +The test consists of 4 lifecycle stages: **init**, **setup**, **VU code** and **teardown** + +**Init** + +1. Parses all environment variables + +2. Creates global kafka producer + +**Setup** + +1. Creates kafka topics if **CREATE_DITTO_CONNECTIONS** env var is **1** + +2. Creates ditto connections ( kafka and http push connections ) if **CREATE_DITTO_CONNECTIONS** env var is **1** + +3. Creates things, if **CREATE_THINGS** env var is **1** + +**VU code** - the stage at which the scenarios get executed + +**Teardown** + +1. Deletes things, if **DELETE_THINGS** env var is **1** + +2. Deletes ditto connections ( ditto kafka, ditto http push connections ) if **DELETE_DITTO_CONNECTIONS** env var is **1** + +3. Deletes kafka topics if **DELETE_DITTO_CONNECTIONS** env var is **1** + +## Creating things + +To create things, the following env variables must be set: + +```bash +CREATE_THINGS=1 +CREATE_DITTO_CONNECTIONS=1 +``` + +**Thing creation will run before any scenario is ran.** + +The benchmark-test tool creates two ditto kafka connections, 'source' and 'target', 'source' is configured with implicitThingCreation payload mapper, which reads from a topic(configurable by **CREATE_UPDATE_THING_SOURCE_TOPIC** env var), and 'target' listens for [thing created]([Things - Create-Or-Modify protocol specification • Eclipse Ditto™ • a digital twin framework](https://eclipse.dev/ditto/protocol-specification-things-create-or-modify.html#event)) event and writes to another topic(configurable by **CREATE_UPDATE_THING_TARGET_TOPIC** env var). Then the test creates a kafka producer, which sends the 'create thing' messages to the 'source' topic and a kafka consumer, which reads the [thing created]([Things - Create-Or-Modify protocol specification • Eclipse Ditto™ • a digital twin framework](https://eclipse.dev/ditto/protocol-specification-things-create-or-modify.html#event)) events and verifies the things are created. + +The kafka 'source' connection looks like the following: + +```json +{ + "id": "4cd191cc-aabb-4965-a1b4-dfe8ae8674bc", + "name": "kafka-source", + "connectionType": "kafka", + "connectionStatus": "open", + "uri": "tcp://192.168.16.2:19092", + "sources": [ + { + "addresses": [ + "create-update" + ], + "consumerCount": 1, + "qos": 0, + "authorizationContext": [ + "nginx:ditto" + ], + "enforcement": { + "input": "{{ header:device_id }}", + "filters": [ + "{{ entity:id }}" + ] + }, + "headerMapping": {}, + "payloadMapping": [ + "implicitThingCreation", + "ditto" + ], + "replyTarget": { + "enabled": false + } + } + ], + "targets": [], + "clientCount": 1, + "failoverEnabled": true, + "validateCertificates": true, + "processorPoolSize": 1, + "specificConfig": { + "saslMechanism": "plain", + "bootstrapServers": "192.168.16.2:19092, 192.168.16.3:19092, 192.168.16.4:19092" + }, + "mappingDefinitions": { + "ditto": { + "mappingEngine": "Ditto", + "options": { + "thingId": "{{ header:device_id }}" + }, + "incomingConditions": { + "sampleCondition": "fn:filter(header:ditto_message,'exists')" + } + }, + "implicitThingCreation": { + "mappingEngine": "ImplicitThingCreation", + "options": { + "thing": { + "thingId": "{{ header:device_id }}", + "_policy": { + "entries": { + "DEVICE": { + "subjects": { + "nginx:ditto": { + "type": "does-not-matter" + } + }, + "resources": { + "policy:/": { + "revoke": [], + "grant": [ + "READ", + "WRITE" + ] + }, + "thing:/": { + "revoke": [], + "grant": [ + "READ", + "WRITE" + ] + }, + "message:/": { + "revoke": [], + "grant": [ + "READ", + "WRITE" + ] + } + } + } + } + }, + "definition": "org.eclipse.ditto:coffeebrewer:0.1.0", + "attributes": { + "location": "test location", + "model": "Speaking coffee machine" + }, + "features": { + "coffee-brewer": { + "properties": { + "brewed-coffees": 0 + } + } + } + } + }, + "incomingConditions": { + "behindGateway": "fn:filter(header:create_thing, 'exists')" + } + } + }, + "tags": [] +} +``` + +The kafka 'target' connection looks like the following: + +```json +{ + "id": "21076098-28e9-416c-8ef0-6c86b5758c27", + "name": "kafka-reply", + "connectionType": "kafka", + "connectionStatus": "open", + "uri": "tcp://192.168.16.2:19092", + "sources": [], + "targets": [ + { + "address": "create-update-reply/{{ thing:id }}", + "topics": [ + "_/_/things/twin/events?filter=eq(topic:action,'created')" + ], + "authorizationContext": [ + "nginx:ditto" + ], + "headerMapping": {} + } + ], + "clientCount": 1, + "failoverEnabled": true, + "validateCertificates": true, + "processorPoolSize": 1, + "specificConfig": { + "saslMechanism": "plain", + "bootstrapServers": "192.168.16.2:19092, 192.168.16.3:19092, 192.168.16.4:19092" + }, + "tags": [] +} +``` + +## Running the test + +###### Running the test locally + +Prerequisites: + +- Running ditto + +- Running kafka cluster with topic deletion enabled + +- Running Monster mock instance ( config for mmock is inside the mmock directory ) + +- xk6 kafka extension binary + +First export all the environment variables, needed for the test: + +```bash +set -a +. test.env +``` + +Then, to run the test: + +```bash +${xk6-kakfa-bin} run test/k6-test.js +``` + +Logs and results are on the terminal standart output. + +###### Running the test inside kubernetes cluster + +Prerequisites: + +- Running kubernetes cluster + +- Running kafka cluster with topic deletion disabled + +- Running ditto inside the cluster, using the ditto helm chart https://github.com/eclipse-ditto/ditto/tree/master/deployment/helm/ditto + +- Deploy the k6 operator [GitHub - grafana/k6-operator: An operator for running distributed k6 tests.](https://github.com/grafana/k6-operator)[GitHub - grafana/k6-operator: An operator for running distributed k6 tests.](https://github.com/grafana/k6-operator) + +- Create config map for mmock from the config files inside **mmock** directory: + + ```bash + kubectl create configmap mmock-config --from-file mmock/ + ``` + +- Running Monster mock instance inside the cluster (kubernetes resource inside kubernetes directory) + +Needed kubernetes resources lie inside the kubernetes directory. + +- **k6-test-configmap-cr.yaml** - custom k6 resource, includes all env variables needed for the test, that are inside test.env file + +- **mmock-pvc.yaml** - Persistent volme claim for monster mock, use to copy the mmock configuration to the created PV, in order to mount it inside the mmock instance. + +- **mmock.yaml** - Pod definition for monster mock + +K6 custom resource gets the source code for the test from a config map, that must be created: + +```bash + kubectl create configmap k6-test --from-file test/ +``` + +K6 custom resource reads env variables from config map that must be created: + +```bash +kubectl create configmap k6-ditto-benchmark --from-env-file test-cluster.env +``` + +After all is set, create the k6 custom resource for the test: + +```bash +kubectl create -f k6-ditto-benchmark-test.yaml +``` + +Logs of the k6 test can be inspected from the pod **k6-ditto-benchmark-test-1-xxxx** diff --git a/benchmark-tool/kubernetes/README.md b/benchmark-tool/kubernetes/README.md new file mode 100644 index 0000000000..06df279c04 --- /dev/null +++ b/benchmark-tool/kubernetes/README.md @@ -0,0 +1,210 @@ +This benchmark is done via the load-testing tool k6 - https://github.com/grafana/k6 with the xk6-kafka extension - https://github.com/mostafa/xk6-kafka + +Monster mock is used to mock http responses, it exposes 2 endpoints that are configurable via mmock/default.yaml and mmock/live_messages.yaml. Default values are /:thingId and /live_messages and they are used to publish modified things events and device live messages sent via HTTP POST request. + +The k6 benchmark test consists of 5 tests available to run, called scenarios: + +- DEVICE_LIVE_MESSAGES - Send live messages to things via HTTP. Ditto HTTP push connection is configured in ditto, which sends events from topic _**/_/things/live/messages** to a monster mock endpoint, which replies with predefined ditto protocol message. + +- SEARCH_THINGS - search things via HTTP ( get things by applying filter ) + +- READ_THINGS - read things via HTTP ( get things by id ) + +- MODIFY_THINGS - Modify things by sending kafka messages to specfic topic. Ditto kafka connection is reading from this topic and processes the messages. Ditto HTTP push connection is configured in ditto, which sends events from topic **_/_/things/twin/events?filter=eq(topic:action,'modified'** to a monster mock endpoint, which replies with HTTP status code 204. + +Also, there is a special scenario called Warmup, which is used to warmup the system, by executing a single read request for each thing, which will cause them to get cached. + +# Getting started: + +## ## K6 is configurable via environment variables and the following must be set, in order to run the test: + +## K6 test related + +| Name | Description | +| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | +| SETUP_TIMEOUT | Specify how long the k6 setup() function is allow to run before it's terminated | +| TEARDOWN_TIMEOUT | Specify how long the k6 teardown() function is allowed to run before it's terminated | +| THINGS_NAMESPACE | remove?Namepsace to use for created ditto things | +| THINGS_ID_TEMPLATE | remove?Things id template, f.e. 'my-thing-{0}' | +| THINGS_START_INDEX | remove?Things start index, used in template, f.e.  if start index is 1, created things have name of: 'my-thing-1', 'my-thing-2', etc. | +| CREATE_THINGS_BATCH_SIZE | Creating things is done via implicitThingCreationMapper via kafka connection. The kafka messages are sent in batches and this variable sets the batch size. | +| CREATE_THINGS | If the test should create things, before executing the scenarios (0/1) | +| DELETE_THINGS | If the test should delete the things, after executing the scenarios (0/1) | +| KAFKA_PRODUCER_LOGGER_ENABLED | K6 kafka producer logger enabled (0/1) | +| KAFKA_CONSUMER_LOGGER_ENABLED | K6 kafka consumer logger enabled (0/1) | +| CREATE_DITTO_CONNECTIONS | If the test should create the needed for scenarios ditto connections, before executing the scenarios | +| DELETE_DITTO_CONNECTIONS | If the test should delete the needed for scenarios ditto connections, after executing the scenarios | +| SCENARIOS_TO_RUN | Array of scenarios names that should run, available options is: WARMUP, DEVICE_LIVE_MESSAGES, SERACH_THINGS, READ_THINGS, MODIFY_THINGS | +| LOG_REMAINING | Log the remaining things that need to be created. Useful for debugging purposes | +| BATCH_SIZE | Max number of simultaneous connections of a k6 http.batch() call, which is used for warming up things | + +## Ditto related + +| Name | Description | +| ------------------------------- | ---------------------------------- | +| DITO_API_URI | Ditto api url | +| DITTO_AUTH_CONTEXT_HEADER | Authorization context header name | +| DITTO_AUTH_CONTEXT_HEADER_VALUE | Authorization context header value | +| DEVICE_FEATURE_NAME | remove? | +| DEVICE_FEATURE_PROPERTY | remove? | +| DEVICE_ID_HEADER | remove? | +| CREATE_THING_HEADER | remove? | +| DITTO_MESSAGE_HEADER | remove? | + +## Kafka related + +| Name | Description | +| ----------------------- | -------------------------------- | +| KAFKA_BOOTSTRAP_SERVERS | Array of kafka bootstrap servers | + +## Scenarios related + +###### WARMUP + +| Name | Description | +| ------------------- | -------------------------------------------------------------------------------------------- | +| WARMUP_MAX_DURATION | The maximum duration of warmup scenario. After, the scenario will be forcefully stopped | +| WARMUP_START_TIME | Time offset since the start of the test, at which point this scenario should begin execution | +| WARMUP_VUS | An integer value specifying the number of VUs to run concurrently | + +###### Every other scenario has the same config variables, created by suffixing the variable name with the name of the scenario, f.e. SEARCH_THINGS_DURATION + +| Name | Description | +| ------------------ | -------------------------------------------------------------------------------------------------- | +| _DURATION | Total scenario duration. | +| _PER_SECOND | Number of requests to execute per second. For kafka scenarios number of kafka messages per second. | +| _PRE_ALLOCATED_VUS | Number of VUs to pre-allocate before scenario start to preserve runtime resources. | +| __MAX_VUS | Maximum number of VUs to allow during the scenario run. | +| __START_TIME | Time offset since the start of the test, at which point this scenario should begin execution. | + +###### Ditto kafka connections + +| Name | Description | +| ------------------------------------------- | ----------------------------------------------------------------------------- | +| KAFKA_CONNECTION_QOS | Ditto kafka connection qos value (0/1) | +| KAFKA_CONNECTION_CUSTOM_ACK | Ditto kafka connection custom acknowledge name | +| KAFKA_SOURCE_CONNECTION_CLIENT_COUNT | Ditto source (consumer) kafka connection client count | +| KAFKA_TARGET_CONNECTION_CLIENT_COUNT | Ditto target (producer) kafka connection client count | +| CREATE_UPDATE_THING_SOURCE_TOPIC | Kafka topic for sending create/update messages to ditto source connection | +| CREATE_UPDATE_THING_SOURCE_TOPIC_PARTITIONS | Number of partitions for the create/update kafka topic | +| CREATE_UPDATE_THING_REPLY_TOPIC | Kafka topic for ditto target connection, replying with 'thing created' events | +| CREATE_UPDATE_THING_REPLY_TOPIC_PARTITIONS | Number of partitions for the create/update reply kafka topic | +| CREATE_UPDATE_THING_CONSUMER_GROUP_ID | K6 kafka consumer group id | + +###### Ditto HTTP Push connection + +| Name | Description | +| ---------------------------------- | ---------------------------------------------------------------------------------------------- | +| HTTP_PUSH_CONNECTION_CLIENT_COUNT | Ditto HTTP push connection client count | +| HTTP_PUSH_CONNECTION_PARALLELISM | Ditto HTTP push connection parallelism | +| PUSH_ENDPOINT_URI | Ditto HTTP push connection endpoint uri | +| PUSH_ENDPOINT_EVENTS_MODIFIED_PATH | Ditto HTTP push connection target address for thing modified event | +| PUSH_ENDPOINT_LIVE_MESSAGE_PATH | Ditto HTTP push connection target address for things live messages events | + +## Test lifecycle + +The test consists of 4 lifecycle stages: **init**, **setup**, **VU code** and **teardown** + +**Init** + +1. Parses all environment variables + +2. Creates global kafka producer + +**Setup** + +1. Creates kafka topics if **CREATE_DITTO_CONNECTIONS** env var is **1** + +2. Creates ditto connections ( ditto kafka, ditto http push connections ) if **CREATE_DITTO_CONNECTIONS** env var is **1** + +3. Creates things, if **CREATE_THINGS** env var is **1** + +**VU code** - the stage at which the scenarios get executed + +**Teardown** + +1. Deletes things, if **DELETE_THINGS** env var is **1** + +2. Deletes ditto connections ( ditto kafka, ditto http push connections ) if **DELETE_DITTO_CONNECTIONS** env var is **1 + +3. Deletes kafka topics if **DELETE_DITTO_CONNECTIONS** env var is **1 + +## + +## Running the test + +###### Running the test locally + +Prerequisites: + +- Running ditto + +- Running kafka cluster with topic deletion enabled + +- Running Monster mock instance ( config for mmock is inside the mmock directory ) + +- Having the xk6 kafka extension binary + +First export all the environment variables, needed for the test: + +```bash +set -a +. test.env +set +a +``` + + Then, to run the test: + +```bash +{xk6-kakfa-bin} run test/k6-test.js +``` + +###### Running the test inside kubernetes cluster + +Prerequisites: + +- Running kubernetes cluster + +- Running kafka cluster with topic deletion disabled + +- Running ditto inside the cluster, using the ditto helm chart https://github.com/eclipse-ditto/ditto/tree/master/deployment/helm/ditto + +- Deploy the k6 operator [GitHub - grafana/k6-operator: An operator for running distributed k6 tests.](https://github.com/grafana/k6-operator)[GitHub - grafana/k6-operator: An operator for running distributed k6 tests.](https://github.com/grafana/k6-operator) + +- Create config map for mmock from the config files inside **mmock** directory: + + ```bash + kubectl create configmap mmock-config --from-file mmock/ + ``` + +- Running Monster mock instance inside the cluster (kubernetes resource inside kubernetes directory) + + + +Needed kubernetes resources lie inside the kubernetes directory. + +- k6-test-configmap-cr.yaml - custom k6 resource, includes all env variables needed for the test, that are inside test.env file + +- mmock-pvc.yaml - Persistent volme claim for monster mock, use to copy the mmock configuration to the created PV, in order to mount it inside the mmock instance. + +- mmock.yaml - Pod definition for monster mock + +K6 custom resource gets the source code for the test from a config map, that must be created: + +```bash + kubectl create configmap k6-test --from-file test/ +``` + +K6 custom resource reads env variables from config map that must be created: + +```bash +kubectl create configmap k6-ditto-benchmark --from-env-file test-cluster.env +``` + +After all is set, create the k6 custom resource for the test: + +```bash +k create -f k6-ditto-benchmark-test.yaml +``` + +Logs of the k6 test can be inspected from the pod **k6-ditto-benchmark-test-1-xxxx** diff --git a/benchmark-tool/kubernetes/k6-ditto-benchmark-test.yaml b/benchmark-tool/kubernetes/k6-ditto-benchmark-test.yaml new file mode 100644 index 0000000000..e26093a3bd --- /dev/null +++ b/benchmark-tool/kubernetes/k6-ditto-benchmark-test.yaml @@ -0,0 +1,330 @@ +# Copyright (c) 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0 +# +# SPDX-License-Identifier: EPL-2.0 +# +apiVersion: k6.io/v1alpha1 +kind: K6 +metadata: + name: k6-ditto-benchmark-test + namespace: ditto-benchmark +spec: + parallelism: 1 + script: + configMap: + name: k6-test + file: k6-test.js + arguments: --include-system-env-vars + runner: + image: mostafamoradian/xk6-kafka:latest + resources: + requests: + cpu: 2 + memory: 16000Mi + env: + - name: CREATE_DITTO_CONNECTIONS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: CREATE_DITTO_CONNECTIONS + - name: DELETE_DITTO_CONNECTIONS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DELETE_DITTO_CONNECTIONS + - name: HTTP_PUSH_CONNECTION_CLIENT_COUNT + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: HTTP_PUSH_CONNECTION_CLIENT_COUNT + - name: HTTP_PUSH_CONNECTION_PARALLELISM + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: HTTP_PUSH_CONNECTION_PARALLELISM + - name: SCENARIOS_TO_RUN + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: SCENARIOS_TO_RUN + - name: AUTHORIZATION_HEADER_VALUE + valueFrom: + secretKeyRef: + name: k6-test-secret + key: AUTHORIZATION_HEADER_VALUE + - name: PRODUCE_THINGS_BATCH_SIZE + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: PRODUCE_THINGS_BATCH_SIZE + - name: BATCH_SIZE + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: BATCH_SIZE + - name: DELETE_THINGS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DELETE_THINGS + - name: LOG_REMAINING + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: LOG_REMAINING + - name: CREATE_THINGS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: CREATE_THINGS + - name: SETUP_TIMEOUT + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: SETUP_TIMEOUT + - name: TEARDOWN_TIMEOUT + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: TEARDOWN_TIMEOUT + - name: THINGS_COUNT + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: THINGS_COUNT + - name: THINGS_START_INDEX + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: THINGS_START_INDEX + - name: MODIFY_THINGS_DURATION + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: MODIFY_THINGS_DURATION + - name: MODIFY_THINGS_PER_SECOND + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: MODIFY_THINGS_PER_SECOND + - name: MODIFY_THINGS_PRE_ALLOCATED_VUS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: MODIFY_THINGS_PRE_ALLOCATED_VUS + - name: MODIFY_THINGS_MAX_VUS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: MODIFY_THINGS_MAX_VUS + - name: MODIFY_THINGS_START_TIME + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: MODIFY_THINGS_START_TIME + - name: READ_THINGS_DURATION + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: READ_THINGS_DURATION + - name: READ_THINGS_PER_SECOND + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: READ_THINGS_PER_SECOND + - name: READ_THINGS_PRE_ALLOCATED_VUS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: READ_THINGS_PRE_ALLOCATED_VUS + - name: READ_THINGS_MAX_VUS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: READ_THINGS_MAX_VUS + - name: READ_THINGS_START_TIME + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: READ_THINGS_START_TIME + - name: SEARCH_THINGS_DURATION + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: SEARCH_THINGS_DURATION + - name: SEARCH_THINGS_PER_SECOND + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: SEARCH_THINGS_PER_SECOND + - name: SEARCH_THINGS_PRE_ALLOCATED_VUS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: SEARCH_THINGS_PRE_ALLOCATED_VUS + - name: SEARCH_THINGS_MAX_VUS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: SEARCH_THINGS_MAX_VUS + - name: SEARCH_THINGS_START_TIME + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: SEARCH_THINGS_START_TIME + - name: DEVICE_LIVE_MESSAGES_DURATION + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DEVICE_LIVE_MESSAGES_DURATION + - name: DEVICE_LIVE_MESSAGES_PER_SECOND + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DEVICE_LIVE_MESSAGES_PER_SECOND + - name: DEVICE_LIVE_MESSAGES_PRE_ALLOCATED_VUS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DEVICE_LIVE_MESSAGES_PRE_ALLOCATED_VUS + - name: DEVICE_LIVE_MESSAGES_MAX_VUS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DEVICE_LIVE_MESSAGES_MAX_VUS + - name: DEVICE_LIVE_MESSAGES_START_TIME + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DEVICE_LIVE_MESSAGES_START_TIME + - name: KAFKA_BOOTSTRAP_SERVERS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: KAFKA_BOOTSTRAP_SERVERS + - name: CREATE_UPDATE_THING_SOURCE_TOPIC + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: CREATE_UPDATE_THING_SOURCE_TOPIC + - name: CREATE_UPDATE_THING_SOURCE_TOPIC_PARTITIONS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: CREATE_UPDATE_THING_SOURCE_TOPIC_PARTITIONS + - name: CREATE_UPDATE_THING_REPLY_TOPIC + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: CREATE_UPDATE_THING_REPLY_TOPIC + - name: CREATE_UPDATE_THING_REPLY_TOPIC_PARTITIONS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: CREATE_UPDATE_THING_REPLY_TOPIC_PARTITIONS + - name: CREATE_UPDATE_THING_CONSUMER_GROUP_ID + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: CREATE_UPDATE_THING_CONSUMER_GROUP_ID + - name: CREATE_UPDATE_THING_CONSUMER_MAX_WAIT_TIME_S + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: CREATE_UPDATE_THING_CONSUMER_MAX_WAIT_TIME_S + - name: CREATE_UPDATE_THING_CONSUMER_SESSION_TIMEOUT_TIME_S + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: CREATE_UPDATE_THING_CONSUMER_SESSION_TIMEOUT_TIME_S + - name: KAFKA_PRODUCER_LOGGER_ENABLED + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: KAFKA_PRODUCER_LOGGER_ENABLED + - name: KAFKA_CONSUMER_LOGGER_ENABLED + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: KAFKA_CONSUMER_LOGGER_ENABLED + - name: KAFKA_CONNECTION_QOS + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: KAFKA_CONNECTION_QOS + - name: KAFKA_CONNECTION_CUSTOM_ACK + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: KAFKA_CONNECTION_CUSTOM_ACK + - name: KAFKA_CONNECTION_CONSUMER_CONSUMER_COUNT + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: KAFKA_CONNECTION_CONSUMER_CONSUMER_COUNT + - name: KAFKA_TARGET_CONNECTION_CLIENT_COUNT + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: KAFKA_TARGET_CONNECTION_CLIENT_COUNT + - name: KAFKA_SOURCE_CONNECTION_CLIENT_COUNT + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: KAFKA_SOURCE_CONNECTION_CLIENT_COUNT + - name: KAFKA_SOURCE_CONNECTION_PROCESSOR_POOL_SIZE + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: KAFKA_CONNECTION_PROCESSOR_POOL_SIZE + - name: DEVICE_NAMESPACE + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DEVICE_NAMESPACE + - name: DEVICE_ID_TEMPLATE + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DEVICE_ID_TEMPLATE + - name: DITTO_BASE_URI + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DITTO_BASE_URI + - name: DITTO_AUTH_CONTEXT + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: DITTO_AUTH_CONTEXT + - name: PUSH_ENDPOINT_URI + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: PUSH_ENDPOINT_URI + - name: PUSH_ENDPOINT_LIVE_MESSAGE_PATH + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: PUSH_ENDPOINT_LIVE_MESSAGE_PATH + - name: PUSH_ENDPOINT_EVENTS_PATH + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: PUSH_ENDPOINT_EVENTS_PATH + - name: THINGS_WARMUP_MAX_DURATION + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: THINGS_WARMUP_MAX_DURATION + - name: THINGS_WARMUP_START_TIME + valueFrom: + configMapKeyRef: + name: k6-ditto-benchmark + key: THINGS_WARMUP_START_TIME diff --git a/benchmark-tool/kubernetes/mmock.yaml b/benchmark-tool/kubernetes/mmock.yaml new file mode 100644 index 0000000000..0140fdb936 --- /dev/null +++ b/benchmark-tool/kubernetes/mmock.yaml @@ -0,0 +1,34 @@ +# Copyright (c) 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0 +# +# SPDX-License-Identifier: EPL-2.0 +# +apiVersion: v1 +kind: Pod +metadata: + name: mmock + namespace: ditto-benchmark + labels: + app.kubernetes.io/name: mmock +spec: + volumes: + - name: config-volume + configMap: + defaultMode: 420 + name: mmock-config + containers: + - name: mmock + image: jordimartin/mmock:latest + imagePullPolicy: IfNotPresent + ports: + - containerPort: 8083 + name: "mmock" + volumeMounts: + - mountPath: /config/ + name: config-volume diff --git a/benchmark-tool/mmock/default.yaml b/benchmark-tool/mmock/default.yaml new file mode 100644 index 0000000000..829da1d4d2 --- /dev/null +++ b/benchmark-tool/mmock/default.yaml @@ -0,0 +1,19 @@ +# Copyright (c) 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0 +# +# SPDX-License-Identifier: EPL-2.0 +# +request: + method: POST + path: "/:thingId" +response: + statusCode: 204 +control: + priority: 1 + diff --git a/benchmark-tool/mmock/live_messages.yaml b/benchmark-tool/mmock/live_messages.yaml new file mode 100644 index 0000000000..b78c07e272 --- /dev/null +++ b/benchmark-tool/mmock/live_messages.yaml @@ -0,0 +1,22 @@ +# Copyright (c) 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0 +# +# SPDX-License-Identifier: EPL-2.0 +# +request: + method: POST + path: "/live_message" +response: + statusCode: 200 + headers: + Content-Type: + - application/vnd.eclipse.ditto+json + body: '{"topic": "{{request.body.topic}}","status": 200,"value": "ok","path": "{{request.body.path}}","headers": {"correlation-id": "{{request.body.headers.correlation-id}}","content-type": "application/json"}}' +control: + priority: 2 diff --git a/benchmark-tool/test-cluster.env b/benchmark-tool/test-cluster.env new file mode 100644 index 0000000000..57585f00ce --- /dev/null +++ b/benchmark-tool/test-cluster.env @@ -0,0 +1,59 @@ +BATCH_SIZE=1000 +CREATE_DITTO_CONNECTIONS=1 +CREATE_THINGS=0 +CREATE_UPDATE_THING_CONSUMER_GROUP_ID=group-create-update +CREATE_UPDATE_THING_CONSUMER_MAX_WAIT_TIME_S=10 +CREATE_UPDATE_THING_CONSUMER_SESSION_TIMEOUT_TIME_S=10 +CREATE_UPDATE_THING_REPLY_TOPIC=create-update-reply +CREATE_UPDATE_THING_REPLY_TOPIC_PARTITIONS=2 +CREATE_UPDATE_THING_SOURCE_TOPIC=create-update +CREATE_UPDATE_THING_SOURCE_TOPIC_PARTITIONS=2 +DELETE_DITTO_CONNECTIONS=1 +DELETE_THINGS=0 +DEVICE_ID_TEMPLATE=test-thing- +DEVICE_LIVE_MESSAGES_DURATION=10m +DEVICE_LIVE_MESSAGES_MAX_VUS=20000 +DEVICE_LIVE_MESSAGES_PER_SECOND=1000 +DEVICE_LIVE_MESSAGES_PRE_ALLOCATED_VUS=20000 +DEVICE_LIVE_MESSAGES_START_TIME=1m +DEVICE_NAMESPACE=org.eclipse.ditto +DITTO_AUTH_CONTEXT=nginx:ditto +DITTO_BASE_URI=http://eclipse-ditto-gateway.ditto-benchmark.svc.cluster.local:8080/api/2 +HTTP_PUSH_CONNECTION_CLIENT_COUNT=2 +HTTP_PUSH_CONNECTION_PARALLELISM=2 +KAFKA_BOOTSTRAP_SERVERS=bitnami-kafka:9092 +KAFKA_CONNECTION_CONSUMER_CONSUMER_COUNT=1 +KAFKA_CONNECTION_CUSTOM_ACK= +KAFKA_CONNECTION_PROCESSOR_POOL_SIZE=1 +KAFKA_CONNECTION_QOS=0 +KAFKA_CONSUMER_LOGGER_ENABLED=0 +KAFKA_PRODUCER_LOGGER_ENABLED=0 +KAFKA_SOURCE_CONNECTION_CLIENT_COUNT=2 +KAFKA_TARGET_CONNECTION_CLIENT_COUNT=1 +LOG_REMAINING=0 +MODIFY_THINGS_DURATION=15m +MODIFY_THINGS_MAX_VUS=10000 +MODIFY_THINGS_PER_SECOND=2200 +MODIFY_THINGS_PRE_ALLOCATED_VUS=10000 +MODIFY_THINGS_START_TIME=0s +PRODUCE_THINGS_BATCH_SIZE=1000 +PUSH_ENDPOINT_EVENTS_PATH= +PUSH_ENDPOINT_LIVE_MESSAGE_PATH=/live_message +PUSH_ENDPOINT_URI=http://mmock-service.ditto-benchmark.svc.cluster.local:80 +READ_THINGS_DURATION=3m +READ_THINGS_MAX_VUS=4000 +READ_THINGS_PER_SECOND=2000 +READ_THINGS_PRE_ALLOCATED_VUS=4000 +READ_THINGS_START_TIME=1m +SCENARIOS_TO_RUN=MODIFY_THINGS +SEARCH_THINGS_DURATION=3m +SEARCH_THINGS_MAX_VUS=8000 +SEARCH_THINGS_PER_SECOND=2100 +SEARCH_THINGS_PRE_ALLOCATED_VUS=8000 +SEARCH_THINGS_START_TIME=1m +SETUP_TIMEOUT=3600s +TEARDOWN_TIMEOUT=3600s +THINGS_COUNT=100000 +THINGS_START_INDEX=1 +THINGS_WARMUP_MAX_DURATION=20m +THINGS_WARMUP_START_TIME=0s diff --git a/benchmark-tool/test-local.env b/benchmark-tool/test-local.env new file mode 100644 index 0000000000..199dcc26f2 --- /dev/null +++ b/benchmark-tool/test-local.env @@ -0,0 +1,82 @@ +SETUP_TIMEOUT=360s +TEARDOWN_TIMEOUT=360s + +CREATE_THINGS=1 +THINGS_COUNT=10 +THINGS_START_INDEX=1 + +CREATE_DITTO_CONNECTIONS=1 +DELETE_DITTO_CONNECTIONS=0 + +SCENARIOS_TO_RUN="" + +THINGS_WARMUP_MAX_DURATION=2m +THINGS_WARMUP_START_TIME=0s +THINGS_WARMUP_VUS=10 + +MODIFY_THINGS_DURATION=10s +MODIFY_THINGS_PER_SECOND=100 +MODIFY_THINGS_PRE_ALLOCATED_VUS=100 +MODIFY_THINGS_MAX_VUS=200 +MODIFY_THINGS_START_TIME=0s + +READ_THINGS_DURATION=30s +READ_THINGS_PER_SECOND=50 +READ_THINGS_PRE_ALLOCATED_VUS=500 +READ_THINGS_MAX_VUS=1000 +READ_THINGS_START_TIME=0s + +SEARCH_THINGS_DURATION=30s +SEARCH_THINGS_PER_SECOND=50 +SEARCH_THINGS_PRE_ALLOCATED_VUS=1 +SEARCH_THINGS_MAX_VUS=1 +SEARCH_THINGS_START_TIME=0s + +DEVICE_LIVE_MESSAGES_DURATION=5s +DEVICE_LIVE_MESSAGES_PER_SECOND=1000 +DEVICE_LIVE_MESSAGES_PRE_ALLOCATED_VUS=5000 +DEVICE_LIVE_MESSAGES_MAX_VUS=5000 +DEVICE_LIVE_MESSAGES_START_TIME=0m + +KAFKA_BOOTSTRAP_SERVERS=172.25.0.2:9094 + +CREATE_UPDATE_THING_SOURCE_TOPIC=create-update +CREATE_UPDATE_THING_SOURCE_TOPIC_PARTITIONS=1 +CREATE_UPDATE_THING_REPLY_TOPIC=create-update-reply +CREATE_UPDATE_THING_REPLY_TOPIC_PARTITIONS=1 +CREATE_UPDATE_THING_CONSUMER_GROUP_ID=group-create-update +CREATE_UPDATE_THING_CONSUMER_MAX_WAIT_TIME_S=10 +CREATE_UPDATE_THING_CONSUMER_SESSION_TIMEOUT_TIME_S=10 + +KAFKA_PRODUCER_LOGGER_ENABLED=0 +KAFKA_CONSUMER_LOGGER_ENABLED=0 + +KAFKA_CONNECTION_QOS=0 +KAFKA_CONNECTION_CUSTOM_ACK=thing-created-reply-sent +KAFKA_SOURCE_CONNECTION_CLIENT_COUNT=1 +KAFKA_TARGET_CONNECTION_CLIENT_COUNT=1 +KAFKA_CONNECTION_PROCESSOR_POOL_SIZE=1 + +#DELETE? +KAFKA_CONNECTION_CONSUMER_CONSUMER_COUNT=1 +KAFKA_CONNECTION_PROCESSOR_POOL_SIZE=1 + + +HTTP_PUSH_CONNECTION_CLIENT_COUNT=1 +HTTP_PUSH_CONNECTION_PARALLELISM=1 + +DEVICE_NAMESPACE=org.eclipse.ditto +DEVICE_ID_TEMPLATE=test-thing- + +DITTO_BASE_URI=http://localhost:8080/api/2 +DITTO_AUTH_CONTEXT=nginx:ditto + +PUSH_ENDPOINT_URI=http://127.0.0.1:8083 +PUSH_ENDPOINT_EVENTS_PATH= +PUSH_ENDPOINT_LIVE_MESSAGE_PATH=/live_messages + +#TO DELETE +LOG_REMAINING=0 +DELETE_THINGS=0 +BATCH_SIZE=100 +PRODUCE_THINGS_BATCH_SIZE=100 diff --git a/benchmark-tool/test/common.js b/benchmark-tool/test/common.js new file mode 100644 index 0000000000..0e2c19fa68 --- /dev/null +++ b/benchmark-tool/test/common.js @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +import { + Writer, + Connection +} from 'k6/x/kafka'; + +import { + SchemaRegistry, +} from 'k6/x/kafka'; + +// KAFKA RELATED +export const BOOTSTRAP_SERVERS = __ENV.KAFKA_BOOTSTRAP_SERVERS.split(','); + +export const CREATE_UPDATE_THING_SOURCE_TOPIC = __ENV.CREATE_UPDATE_THING_SOURCE_TOPIC; +export const CREATE_UPDATE_THING_SOURCE_TOPIC_PARTITIONS = parseInt(__ENV.CREATE_UPDATE_THING_SOURCE_TOPIC_PARTITIONS); +export const CREATE_UPDATE_THING_REPLY_TOPIC = __ENV.CREATE_UPDATE_THING_REPLY_TOPIC; +export const CREATE_UPDATE_THING_REPLY_TOPIC_PARTITIONS = parseInt(__ENV.CREATE_UPDATE_THING_REPLY_TOPIC_PARTITIONS); +export const CREATE_UPDATE_THING_CONSUMER_GROUP_ID = __ENV.CREATE_UPDATE_THING_CONSUMER_GROUP_ID; + +let producer = undefined; +producer = new Writer({ + brokers: BOOTSTRAP_SERVERS, + topic: CREATE_UPDATE_THING_SOURCE_TOPIC, + connectLogger: __ENV.KAFKA_PRODUCER_LOGGER_ENABLED == 1, +}); +// if (__VU === 1) { +// // execute only once +// createThingsTopicsIfNotAvailable(); +// } + +export const KAFKA_CREATE_UPDATE_PRODUCER = producer; + +export const KAFKA_CONNECTION_QOS = parseInt(__ENV.KAFKA_CONNECTION_QOS); +export const KAFKA_CONNECTION_CUSTOM_ACK = __ENV.KAFKA_CONNECTION_CUSTOM_ACK; +export const KAFKA_CONNECTION_CONSUMER_CONSUMER_COUNT = parseInt(__ENV.KAFKA_CONNECTION_CONSUMER_CONSUMER_COUNT); +export const KAFKA_SOURCE_CONNECTION_CLIENT_COUNT = parseInt(__ENV.KAFKA_SOURCE_CONNECTION_CLIENT_COUNT); +export const KAFKA_TARGET_CONNECTION_CLIENT_COUNT = parseInt(__ENV.KAFKA_TARGET_CONNECTION_CLIENT_COUNT); +export const KAFKA_CONNECTION_PROCESSOR_POOL_SIZE = parseInt(__ENV.KAFKA_CONNECTION_PROCESSOR_POOL_SIZE); + +// KAFKA RELATED + +// HTTP PUSH CONNECTION +export const HTTP_PUSH_CONNECTION_CLIENT_COUNT = parseInt(__ENV.HTTP_PUSH_CONNECTION_CLIENT_COUNT); +export const HTTP_PUSH_CONNECTION_PARALLELISM = parseInt(__ENV.HTTP_PUSH_CONNECTION_PARALLELISM); +// HTTP PUSH CONNECTION + +// DITTO RELATED +export const DEVICE_NAMESPACE = __ENV.DEVICE_NAMESPACE; +export const DEVICE_ID_TEMPLATE = `${DEVICE_NAMESPACE}:${__ENV.DEVICE_ID_TEMPLATE}`; +export const THINGS_COUNT = parseInt(__ENV.THINGS_COUNT); +export const BATCH_SIZE = parseInt(__ENV.BATCH_SIZE); +export const PRODUCE_THINGS_BATCH_SIZE = parseInt(__ENV.PRODUCE_THINGS_BATCH_SIZE); + +export const THINGS_START_INDEX = parseInt(__ENV.THINGS_START_INDEX); +export function GET_THING_ID(index) { + return DEVICE_ID_TEMPLATE + (index + THINGS_START_INDEX); +} + +export const DEVICE_FEATURE_NAME = 'coffee-brewer'; +export const DEVICE_FEATURE_PROPERTY = 'brewed-coffees'; +export const DEVICE_ID_HEADER = 'device_id'; +export const CREATE_THING_HEADER = 'create_thing'; +export const DITTO_MESSAGE_HEADER = 'ditto_message'; +// DITTO RELATED + +// K6 RELATED +export const schemaRegistry = new SchemaRegistry(); +// K6 RELATED + +// SCENARIOS RELATED +export const THINGS_WARMUP_VUS = parseInt(__ENV.THINGS_WARMUP_VUS); +// SCENARIOS RELATED + +export function createThingsTopicsIfNotAvailable() { + let connection; + try { + connection = new Connection({ address: BOOTSTRAP_SERVERS[0] }); + let availableTopics = connection.listTopics(); + console.log(availableTopics); + if (availableTopics.indexOf(CREATE_UPDATE_THING_SOURCE_TOPIC) === -1) { + console.log(`creating topic ${CREATE_UPDATE_THING_SOURCE_TOPIC}`); + connection.createTopic({ + topic: CREATE_UPDATE_THING_SOURCE_TOPIC, + numPartitions: CREATE_UPDATE_THING_SOURCE_TOPIC_PARTITIONS + }); + } + + if (availableTopics.indexOf(CREATE_UPDATE_THING_REPLY_TOPIC) === -1) { + console.log(`creating topic ${CREATE_UPDATE_THING_REPLY_TOPIC}`); + connection.createTopic({ + topic: CREATE_UPDATE_THING_REPLY_TOPIC, + numPartitions: CREATE_UPDATE_THING_REPLY_TOPIC_PARTITIONS + }); + } + } finally { + if (connection !== undefined) { + connection.close(); + } + } +} + +export function deleteTopics() { + let connection; + try { + connection = new Connection({ address: BOOTSTRAP_SERVERS[0] }); + connection.deleteTopic(CREATE_UPDATE_THING_SOURCE_TOPIC); + connection.deleteTopic(CREATE_UPDATE_THING_REPLY_TOPIC); + } finally { + if (connection !== undefined) { + connection.close(); + } + } +} diff --git a/benchmark-tool/test/device-live-message.js b/benchmark-tool/test/device-live-message.js new file mode 100644 index 0000000000..49fed00cd7 --- /dev/null +++ b/benchmark-tool/test/device-live-message.js @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +import { sendLiveMessageToThing } from "./http-util.js"; +import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js'; +import { fail } from 'k6'; +import * as common from './common.js' + +export function sendDeviceLiveMessage() { + let thingId = common.GET_THING_ID(randomIntBetween(common.THINGS_START_INDEX, common.THINGS_COUNT - 1)); + let response = sendLiveMessageToThing(thingId, 'subject', null); + if (response.status != 200) { + fail(`Failed to send live message to thing ${thingId}; Response: ${JSON.stringify(response)}`); + } +} diff --git a/benchmark-tool/test/http-util.js b/benchmark-tool/test/http-util.js new file mode 100644 index 0000000000..abc67719ec --- /dev/null +++ b/benchmark-tool/test/http-util.js @@ -0,0 +1,383 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +import { sleep } from 'k6'; +import http from 'k6/http'; +import * as common from './common.js'; +import { formatString } from './str-util.js' + +const DITTO_BASE_URI = __ENV.DITTO_BASE_URI; + +const DITTO_CONNECTIONS_URI = `${DITTO_BASE_URI}/connections`; +const DITTO_THINGS_URI = `${DITTO_BASE_URI}/things`; +const DITTO_POLICIES_URI = `${DITTO_BASE_URI}/policies`; +const DITTO_SEARCH_THINGS_URI = `${DITTO_BASE_URI}/search/things` + +const DITTO_THINGS_MESSAGES_URI_FORMAT = `${DITTO_THINGS_URI}/{0}/inbox/messages/{1}` + +const PUSH_ENDPOINT_URI = __ENV.PUSH_ENDPOINT_URI; +const PUSH_ENDPOINT_EVENTS_PATH = __ENV.PUSH_ENDPOINT_EVENTS_PATH +const PUSH_ENDPOINT_LIVE_MESSAGE_PATH = __ENV.PUSH_ENDPOINT_LIVE_MESSAGE_PATH; + +const AUTH_CONTEXT = __ENV.DITTO_AUTH_CONTEXT; + +const REQUEST_HEADERS = { + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'x-ditto-pre-authenticated': AUTH_CONTEXT, + 'Authorization': __ENV.AUTHORIZATION_HEADER_VALUE + }, + tags: { name: 'grouped' } +}; + +const DEVICE_FEATURE = { + [common.DEVICE_FEATURE_NAME]: { + 'properties': { + [common.DEVICE_FEATURE_PROPERTY]: 0 + } + } +}; + +export function createHttpPushConnection() { + let connectionBody = constructHttpPushConnection(common.HTTP_PUSH_CONNECTION_CLIENT_COUNT, common.HTTP_PUSH_CONNECTION_PARALLELISM); + let resp = http.post(DITTO_CONNECTIONS_URI, JSON.stringify(connectionBody), REQUEST_HEADERS); + let connection = resp.json(); + + return connection; +} + +export function createKafkaSourceConnection(customAckConnectionId) { + let connectionBody = constructKafkaSourceConnection(customAckConnectionId); + let resp = http.post(DITTO_CONNECTIONS_URI, JSON.stringify(connectionBody), REQUEST_HEADERS); + let connection = resp.json(); + + return connection; +} + +export function createKafkaTargetConnection() { + let connectionBody = constructKafkaTargetConnection(); + let resp = http.post(DITTO_CONNECTIONS_URI, JSON.stringify(connectionBody), REQUEST_HEADERS); + let connection = resp.json(); + + return connection; +} + +export function waitForConnectionToOpen(connectionId) { + let connectionOpen; + while (true) { + let connectionStatus = http.get(`${DITTO_CONNECTIONS_URI}/${connectionId}/status`, REQUEST_HEADERS).json(); + + connectionOpen = 1; + connectionStatus.clientStatus.forEach((client) => { + connectionOpen &= (client.status === 'open' && client.statusDetails === 'CONNECTED'); + }); + + if (connectionStatus.sourceStatus != undefined) { + connectionStatus.sourceStatus.forEach((source) => { + connectionOpen &= (source.status === 'open' && source.statusDetails === 'Consumer started.'); + }); + } + + if (connectionStatus.targetStatus != undefined) { + connectionStatus.targetStatus.forEach((target) => { + let statusOpenCheck = target.status === 'open' && target.statusDetails === 'Producer started.'; + let statusUnknownCheck = target.status === 'unknown' && (target.statusDetails.match('.* on-demand') !== undefined); + connectionOpen &= statusOpenCheck || statusUnknownCheck; + }); + } + + if (connectionOpen === 1) { + break; + } + + sleep(1); + } +} + +export function getThing(id) { + return http.get(DITTO_THINGS_URI + '/' + id, REQUEST_HEADERS); +} + +export function getThingsBatch(ids) { + let requests = []; + ids.forEach(id => { + requests.push({ + 'method': 'GET', + 'url': DITTO_THINGS_URI + '/' + id, + 'params': REQUEST_HEADERS + }); + }) + + return http.batch(requests); +} + +export function searchThingById(id) { + return http.get(DITTO_SEARCH_THINGS_URI + getSearchByThingIdFilter(id), REQUEST_HEADERS); +} +export function searchThingByFeature(value) { + let uri = DITTO_SEARCH_THINGS_URI + getSearchByFeatureFilter(value); + return http.get(DITTO_SEARCH_THINGS_URI + getSearchByFeatureFilter(value), REQUEST_HEADERS); +} + +export function searchThingsBatch(ids) { + let requests = []; + ids.forEach(id => { + requests.push({ + 'method': 'GET', + 'url': DITTO_SEARCH_THINGS_URI + getSearchByThingIdFilter(id), + 'params': REQUEST_HEADERS + }); + }) + + return http.batch(requests); +} + +export function sendLiveMessageToThing(id, subject, message) { + return http.post(formatString(DITTO_THINGS_MESSAGES_URI_FORMAT, id, subject), message, REQUEST_HEADERS); +} + +export function sendLiveMessageToThingsBatch(ids, subject, message) { + let requests = []; + ids.forEach(id => { + requests.push({ + 'method': 'GET', + 'url': formatString(DITTO_THINGS_MESSAGES_URI_FORMAT, id, subject), + 'body': message, + 'params': REQUEST_HEADERS + }); + }) + return http.batch(requests); +} + +export function deleteThing(id) { + return http.del(DITTO_THINGS_URI + '/' + id, null, REQUEST_HEADERS); +} + +export function deletePolicy(id) { + return http.del(DITTO_POLICIES_URI + '/' + id, null, REQUEST_HEADERS); +} + +export function deleteConnection(id) { + return http.del(DITTO_CONNECTIONS_URI + '/' + id, null, REQUEST_HEADERS); +} + +function getSearchByFeatureFilter(value) { + return `?filter=eq(features/${common.DEVICE_FEATURE_NAME}/properties/${common.DEVICE_FEATURE_PROPERTY},${value})`; +} + +function getSearchByThingIdFilter(id) { + return `?filter=eq(thingId,'${id}')`; +} + +function constructHttpPushConnection(clientCount, parallelism) { + let connectionBody = { + 'name': 'http-push-connection', + 'connectionType': 'http-push', + 'connectionStatus': 'open', + 'uri': PUSH_ENDPOINT_URI, + 'clientCount': clientCount, + 'specificConfig': { + 'parallelism': parallelism + }, + 'sources': [], + 'targets': [ + { + 'address': `POST:${PUSH_ENDPOINT_LIVE_MESSAGE_PATH}`, + 'topics': [ + '_/_/things/live/messages' + ], + 'authorizationContext': [AUTH_CONTEXT] + } + ], + 'tags': ['benchmark'] + }; + + let thingModifiedTarget = { + 'address': `POST:${PUSH_ENDPOINT_EVENTS_PATH}/{{ thing:id }}`, + 'topics': [ + '_/_/things/twin/events?filter=eq(topic:action,\'modified\')' + ], + 'authorizationContext': [AUTH_CONTEXT] + }; + + if (common.KAFKA_CONNECTION_QOS) { + if (common.KAFKA_CONNECTION_CUSTOM_ACK != '') { + thingModifiedTarget['issuedAcknowledgementLabel'] = `{{connection:id}}:${common.KAFKA_CONNECTION_CUSTOM_ACK}`; + } + } + + connectionBody['targets'].push(thingModifiedTarget); + + return connectionBody; +} + +function constructKafkaSourceConnection(customAckConnectionId) { + let kafkaConnection = constructKafkaConnection('kafka-source', common.KAFKA_SOURCE_CONNECTION_CLIENT_COUNT); + + kafkaConnection.sources = [ + constructConnectionSource(common.CREATE_UPDATE_THING_SOURCE_TOPIC, AUTH_CONTEXT, + common.DEVICE_ID_HEADER, common.KAFKA_CONNECTION_CONSUMER_CONSUMER_COUNT, common.KAFKA_CONNECTION_QOS, + customAckConnectionId, common.KAFKA_CONNECTION_CUSTOM_ACK) + ]; + + kafkaConnection.mappingDefinitions = { + 'implicitThingCreation': { + 'mappingEngine': 'ImplicitThingCreation', + 'options': { + 'thing': constructThingTemplate() + }, + 'incomingConditions': { + 'behindGateway': `fn:filter(header:${common.CREATE_THING_HEADER}, 'exists')` + } + }, + 'ditto': { + 'mappingEngine': 'Ditto', + 'options': { + 'thingId': `{{ header:${common.DEVICE_ID_HEADER} }}` + }, + 'incomingConditions': { + 'sampleCondition': `fn:filter(header:${common.DITTO_MESSAGE_HEADER},'exists')` + } + } + }; + return kafkaConnection; +} + +function constructKafkaTargetConnection() { + let kafkaConnection = constructKafkaConnection('kafka-reply', common.KAFKA_TARGET_CONNECTION_CLIENT_COUNT); + kafkaConnection.targets = [ + constructConnectionTarget(common.CREATE_UPDATE_THING_REPLY_TOPIC, AUTH_CONTEXT) + ]; + + return kafkaConnection; +} + +function constructKafkaConnection(name, clientCount) { + return { + 'name': name, + 'connectionType': 'kafka', + 'connectionStatus': 'open', + 'uri': 'tcp://' + common.BOOTSTRAP_SERVERS[0], + 'clientCount': clientCount, + 'processorPoolSize': common.KAFKA_CONNECTION_PROCESSOR_POOL_SIZE, + 'specificConfig': { + 'saslMechanism': 'plain', + 'bootstrapServers': common.BOOTSTRAP_SERVERS.join() + }, + 'tags': ['benchmark'] + } +} + +function constructConnectionSource(sourceTopic, authContext, inputHeader, consumerCount, qos, customAckConnectionId, customAck) { + let connectionSource = { + 'addresses': [ + sourceTopic + ], + 'consumerCount': consumerCount, + 'qos': qos, + 'authorizationContext': [ + authContext + ], + 'enforcement': { + 'input': `{{ header:${inputHeader} }}`, + 'filters': [ + '{{ entity:id }}' + ] + }, + 'payloadMapping': [ + 'implicitThingCreation', + 'ditto' + ], + 'replyTarget': { + 'enabled': false + } + }; + + if (qos === 1) { + if (customAckConnectionId !== undefined && (customAck != undefined && customAck != '')) { + connectionSource['acknowledgementRequests'] = { + 'includes': [`${customAckConnectionId}:${customAck}`] + }; + connectionSource['declaredAcks'] = [ + `{{connection:id}}:${customAck}` + ]; + } + } + + return connectionSource; +} + +function constructConnectionTarget(replyTopic, authContext) { + let connectionTarget = { + 'address': `${replyTopic}/{{ thing:id }}`, + 'topics': [ + '_/_/things/twin/events?filter=eq(topic:action,\'created\')' + + ], + 'authorizationContext': [ + authContext + ] + }; + + return connectionTarget; +} + +function constructThingTemplate() { + return { + 'thingId': `{{ header:${common.DEVICE_ID_HEADER} }}`, + '_policy': constructThingPolicy(), + 'definition': 'org.eclipse.ditto:coffeebrewer:0.1.0', + 'attributes': { + 'location': 'test location', + 'model': 'Speaking coffee machine' + }, + 'features': DEVICE_FEATURE + } +} + +function constructThingPolicy() { + return { + 'entries': { + 'DEVICE': { + 'subjects': { + [AUTH_CONTEXT]: { + 'type': 'does-not-matter' + } + }, + 'resources': { + 'policy:/': { + 'revoke': [], + 'grant': [ + 'READ', + 'WRITE' + ] + }, + 'thing:/': { + 'revoke': [], + 'grant': [ + 'READ', + 'WRITE' + ] + }, + 'message:/': { + 'revoke': [], + 'grant': [ + 'READ', + 'WRITE' + ] + } + } + } + } + } +} diff --git a/benchmark-tool/test/k6-test.js b/benchmark-tool/test/k6-test.js new file mode 100644 index 0000000000..7c0844d953 --- /dev/null +++ b/benchmark-tool/test/k6-test.js @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +export { modifyThing } from './modify-thing.js' +export { searchThing, searchThingBatch } from './search-thing.js' +export { readThing } from './read-thing.js' +export { warmup } from './warmup.js' +export { sendDeviceLiveMessage } from './device-live-message.js' +import * as common from './common.js' +import { sendCreateThingsAndPolicies } from './kafka-util.js'; + +import { + Reader +} from 'k6/x/kafka'; + +import { + createHttpPushConnection, + createKafkaSourceConnection, + createKafkaTargetConnection, + deleteConnection, + deletePolicy, + deleteThing, + waitForConnectionToOpen, +} from './http-util.js'; + +export const options = { + setupTimeout: __ENV.SETUP_TIMEOUT, + teardownTimeout: __ENV.TEARDOWN_TIMEOUT, + + // will set later + scenarios: {}, + + batch: common.BATCH_SIZE, + batchPerHost: common.BATCH_SIZE, +}; + +let WARMUP = 'WARMUP'; +let MODIFY_THINGS = 'MODIFY_THINGS'; +let SEARCH_THINGS = 'SEARCH_THINGS'; +let READ_THINGS = 'READ_THINGS'; +let DEVICE_LIVE_MESSAGES = 'DEVICE_LIVE_MESSAGES'; + +let availableScenarios = {}; + +availableScenarios[WARMUP] = { + executor: 'per-vu-iterations', + maxDuration: __ENV.THINGS_WARMUP_MAX_DURATION, + exec: 'warmup', + startTime: __ENV.THINGS_WARMUP_START_TIME +}; + +availableScenarios[MODIFY_THINGS] = { + executor: 'constant-arrival-rate', + duration: __ENV.MODIFY_THINGS_DURATION, + rate: __ENV.MODIFY_THINGS_PER_SECOND, + timeUnit: '1s', + preAllocatedVus: __ENV.MODIFY_THINGS_PRE_ALLOCATED_VUS, + maxVUs: __ENV.MODIFY_THINGS_MAX_VUS, + exec: 'modifyThing', + startTime: __ENV.MODIFY_THINGS_START_TIME +}; + +availableScenarios[SEARCH_THINGS] = { + executor: 'constant-arrival-rate', + duration: __ENV.SEARCH_THINGS_DURATION, + rate: __ENV.SEARCH_THINGS_PER_SECOND, + timeUnit: '1s', + preAllocatedVus: __ENV.SEARCH_THINGS_PRE_ALLOCATED_VUS, + maxVUs: __ENV.SEARCH_THINGS_MAX_VUS, + exec: 'searchThing', + startTime: __ENV.SEARCH_THINGS_START_TIME +}; + +availableScenarios[READ_THINGS] = { + executor: 'constant-arrival-rate', + duration: __ENV.READ_THINGS_DURATION, + rate: __ENV.READ_THINGS_PER_SECOND, + timeUnit: '1s', + preAllocatedVus: __ENV.READ_THINGS_PRE_ALLOCATED_VUS, + maxVUs: __ENV.READ_THINGS_MAX_VUS, + exec: 'readThing', + startTime: __ENV.READ_THINGS_START_TIME +}; + +availableScenarios[DEVICE_LIVE_MESSAGES] = { + executor: 'constant-arrival-rate', + duration: __ENV.DEVICE_LIVE_MESSAGES_DURATION, + rate: __ENV.DEVICE_LIVE_MESSAGES_PER_SECOND, + timeUnit: '1s', + preAllocatedVus: __ENV.DEVICE_LIVE_MESSAGES_PRE_ALLOCATED_VUS, + maxVUs: __ENV.DEVICE_LIVE_MESSAGES_MAX_VUS, + exec: 'sendDeviceLiveMessage', + startTime: __ENV.DEVICE_LIVE_MESSAGES_START_TIME +}; + +let scenariosToRun = __ENV.SCENARIOS_TO_RUN !== undefined ? __ENV.SCENARIOS_TO_RUN.split(/\s*,\s*/) : undefined; +if (scenariosToRun !== undefined) { + scenariosToRun.forEach(scenario => { + if (availableScenarios[scenario] !== undefined) { + options.scenarios[scenario] = availableScenarios[scenario]; + } + }); +} + +export function setup() { + let httpPushConnection, kafkaTargetConnection, kafkaSourceConnection, consumer; + + if (__ENV.CREATE_DITTO_CONNECTIONS == 1) { + let httpPushConnectionId = undefined; + if (shouldCreateHttpPushConnection()) { + httpPushConnection = createHttpPushConnection(); + waitForConnectionToOpen(httpPushConnection.id); + } + + if (shouldCreateKafkaSourceConnection()) { + common.createThingsTopicsIfNotAvailable(); + kafkaSourceConnection = createKafkaSourceConnection(httpPushConnectionId); + waitForConnectionToOpen(kafkaSourceConnection.id); + } + } + + if (shouldCreateKafkaTargetConnection()) { + consumer = new Reader({ + brokers: common.BOOTSTRAP_SERVERS, + groupID: common.CREATE_UPDATE_THING_CONSUMER_GROUP_ID, + groupTopics: [common.CREATE_UPDATE_THING_REPLY_TOPIC], + connectLogger: __ENV.KAFKA_CONSUMER_LOGGER_ENABLED == 1, + maxWait: parseInt(__ENV.CREATE_UPDATE_THING_CONSUMER_MAX_WAIT_TIME_S) * 1000000000 + }); + + kafkaTargetConnection = createKafkaTargetConnection(); + waitForConnectionToOpen(kafkaTargetConnection.id); + + let thingIds = [] + for (let i = 0; i < common.THINGS_COUNT; i++) { + thingIds.push(common.GET_THING_ID(i)) + } + sendCreateThingsAndPolicies(thingIds, common.KAFKA_CREATE_UPDATE_PRODUCER, consumer); + console.log('created things'); + } + + return { + kafkaSourceConnection: kafkaSourceConnection, + kafkaTargetConnection: kafkaTargetConnection, + httpPushConnection: httpPushConnection + }; +} + +export default function () { + console.log('no scenarios to run configured'); +} + +export function teardown(config) { + console.log("TEARDOWN EXECUTING...") + if (__ENV.DELETE_THINGS == 1) { + for (let i = 0; i < common.THINGS_COUNT; i++) { + let thingId = common.GET_THING_ID(i); + deleteThing(thingId); + deletePolicy(thingId); + } + } + if (__ENV.DELETE_DITTO_CONNECTIONS == 1) { + if (config.httpPushConnection != undefined) { + deleteConnection(config.httpPushConnection.id); + } + + if (config.kafkaTargetConnection != undefined) { + deleteConnection(config.kafkaTargetConnection.id); + } + + if (config.kafkaSourceConnection != undefined) { + deleteConnection(config.kafkaSourceConnection.id); + } + + if (__ENV.CREATE_THINGS == 1 || options.scenarios[MODIFY_THINGS] !== undefined) { + common.deleteTopics(); + } + } +} + + +function shouldCreateHttpPushConnection() { + return scenariosToRun.indexOf(DEVICE_LIVE_MESSAGES) !== -1 || scenariosToRun.indexOf(MODIFY_THINGS) !== -1 +} + +function shouldCreateKafkaSourceConnection() { + return __ENV.CREATE_THINGS == 1 || scenariosToRun.indexOf(MODIFY_THINGS) !== -1 +} + +function shouldCreateKafkaTargetConnection() { + return __ENV.CREATE_THINGS == 1; +} diff --git a/benchmark-tool/test/kafka-util.js b/benchmark-tool/test/kafka-util.js new file mode 100644 index 0000000000..41c93a7a5a --- /dev/null +++ b/benchmark-tool/test/kafka-util.js @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +import * as common from './common.js'; +import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js'; + +import { + SCHEMA_TYPE_JSON, + SCHEMA_TYPE_STRING +} from 'k6/x/kafka'; + +const DITTO_COMMANDS_MODIFY_TOPIC_PATH = 'things/twin/commands/modify'; +const THING_PERSISTED_ACK = 'twin-persisted'; + +const DEVICE_FEATURE_PATH = `features/${common.DEVICE_FEATURE_NAME}/properties/${common.DEVICE_FEATURE_PROPERTY}` + +export function sendCreateThingsAndPolicies(thingIds, producer, consumer) { + console.log('create ' + common.THINGS_COUNT + ' things') + let messages = []; + let batchNum = 0; + thingIds.forEach(thingId => { + let headers = { + [common.DEVICE_ID_HEADER]: thingId, + [common.CREATE_THING_HEADER]: 1 + }; + + messages.push({ + headers: headers, + key: common.schemaRegistry.serialize({ + data: thingId, + schemaType: SCHEMA_TYPE_STRING + }), + value: null + }); + + if (messages.length === common.PRODUCE_THINGS_BATCH_SIZE) { + console.log('produce batch of messages..'); + producer.produce({ messages: messages }); + let from = batchNum * common.PRODUCE_THINGS_BATCH_SIZE; + consumeAndValidateThingsCreated(new Set(thingIds.slice(from, from + common.PRODUCE_THINGS_BATCH_SIZE - 1)), consumer); + messages = []; + batchNum++; + } + }); + if (messages.length > 0) { + producer.produce({ messages: messages }); + consumeAndValidateThingsCreated(new Set(thingIds.slice(batchNum * common.PRODUCE_THINGS_BATCH_SIZE)), consumer); + } +} + +export function consumeAndValidateThingsCreated(thingIdsSet, consumer) { + while (thingIdsSet.size > 0) { + try { + let messages = consumer.consume({ limit: 1 }); + messages.forEach(message => { + let consumedEvent = common.schemaRegistry.deserialize({ + data: message.value, + schemaType: SCHEMA_TYPE_JSON, + }); + let thingId = consumedEvent.value.thingId; + thingIdsSet.delete(thingId); + if (__ENV.LOG_REMAINING == 1) { + console.log('things remaining:'); + console.log([...thingIdsSet]); + } + }); + } catch (readTimeoutExc) { + console.log('timed out waiting for kafka message') + throw readTimeoutExc; + } + } +} + +export function sendModifyThing(producer, thingId) { + producer.produce({ + messages: [{ + headers: { + [common.DEVICE_ID_HEADER]: thingId, + [common.DITTO_MESSAGE_HEADER]: 1, + }, + value: common.schemaRegistry.serialize({ + data: constructModifyThingMessage(common.DEVICE_NAMESPACE, thingId.split(':')[1], DEVICE_FEATURE_PATH, randomIntBetween(1, 1000)), + schemaType: SCHEMA_TYPE_JSON + }) + }] + }) +} + +export function sendModifyThings(producer, thingIds) { + let messages = []; + thingIds.forEach(thingId => { + let headers = { + [common.DEVICE_ID_HEADER]: thingId, + [common.DITTO_MESSAGE_HEADER]: 1 + }; + + messages.push({ + headers: headers, + key: common.schemaRegistry.serialize({ + data: thingId, + schemaType: SCHEMA_TYPE_STRING + }), + value: common.schemaRegistry.serialize({ + data: constructModifyThingMessage(common.DEVICE_NAMESPACE, thingId.split(':')[1], DEVICE_FEATURE_PATH, randomIntBetween(1, 1000)), + schemaType: SCHEMA_TYPE_JSON + }) + }); + }); + producer.produce({ messages: messages }); +} + +function constructModifyThingMessage(namespace, thingId, featurePath, value) { + return { + 'topic': `${namespace}/${thingId}/${DITTO_COMMANDS_MODIFY_TOPIC_PATH}`, + 'headers': { + 'requested-acks': [THING_PERSISTED_ACK] + }, + 'path': featurePath, + 'value': value + }; +} diff --git a/benchmark-tool/test/modify-thing.js b/benchmark-tool/test/modify-thing.js new file mode 100644 index 0000000000..563c8da037 --- /dev/null +++ b/benchmark-tool/test/modify-thing.js @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js'; +import { sendModifyThing } from './kafka-util.js'; +import * as common from './common.js' + +export function modifyThing() { + let thingId = common.GET_THING_ID(randomIntBetween(common.THINGS_START_INDEX, common.THINGS_COUNT - 1)); + sendModifyThing(common.KAFKA_CREATE_UPDATE_PRODUCER, thingId); +} diff --git a/benchmark-tool/test/read-thing.js b/benchmark-tool/test/read-thing.js new file mode 100644 index 0000000000..42baceb0ee --- /dev/null +++ b/benchmark-tool/test/read-thing.js @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js'; +import { fail } from 'k6'; +import { getThing } from './http-util.js'; +import * as common from './common.js' + +export function readThing() { + let thingId = common.GET_THING_ID(randomIntBetween(common.THINGS_START_INDEX, common.THINGS_COUNT - 1)); + let response = getThing(thingId); + if (response.status != 200) { + fail(`Failed to read thing ${thingId}; Response: ${JSON.stringify(response)}`); + } +} diff --git a/benchmark-tool/test/search-thing.js b/benchmark-tool/test/search-thing.js new file mode 100644 index 0000000000..ff126f4513 --- /dev/null +++ b/benchmark-tool/test/search-thing.js @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js'; +import { fail } from 'k6'; +import { searchThingByFeature, searchThingById } from './http-util.js'; +import * as common from './common.js' + +export function searchThing() { + let thingId = common.GET_THING_ID(randomIntBetween(common.THINGS_START_INDEX, common.THINGS_COUNT - 1)); + let response = searchThingById(thingId); + if (response.status != 200) { + fail(`Failed to search things with feature value of ${value}. Response: ${JSON.stringify(response)}`); + } +} diff --git a/benchmark-tool/test/str-util.js b/benchmark-tool/test/str-util.js new file mode 100644 index 0000000000..b2ea680506 --- /dev/null +++ b/benchmark-tool/test/str-util.js @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +export function formatString(str, ...args) { + return str.replace(/{([0-9]+)}/g, function (match, index) { + return typeof args[index] == 'undefined' ? match : args[index]; + }) +}; diff --git a/benchmark-tool/test/warmup.js b/benchmark-tool/test/warmup.js new file mode 100644 index 0000000000..e017eab197 --- /dev/null +++ b/benchmark-tool/test/warmup.js @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +import { getThingsBatch } from './http-util.js'; +import * as common from './common.js' + +export function warmup() { + console.log(`WARMING UP ${common.THINGS_COUNT} THINGS IN BATCH BY ${common.BATCH_SIZE}`); + + let thingIds = []; + for (let i = 0; i < common.THINGS_COUNT; i++) { + thingIds.push(common.GET_THING_ID(i + common.THINGS_START_INDEX)); + + if (thingIds.length === common.BATCH_SIZE) { + let responses = getThingsBatch(thingIds) + responses.forEach(response => { + if (response.status != 200) { + console.log(`Failed to warmup thing.`); + console.log(response); + } + }); + thingIds = []; + } + } + + if (thingIds.length > 0) { + let responses = getThingsBatch(thingIds) + responses.forEach(response => { + if (response.status != 200) { + console.log(`Failed to warmup thing.`); + console.log(response); + } + }); + } + + console.log("WARMED UP THINGS"); +} diff --git a/documentation/src/main/resources/_data/authors.yml b/documentation/src/main/resources/_data/authors.yml index 1e286541d9..ebaab70309 100644 --- a/documentation/src/main/resources/_data/authors.yml +++ b/documentation/src/main/resources/_data/authors.yml @@ -62,3 +62,8 @@ aleksandar_stanchev: name: Aleksandar Stanchev email: aleksandar.stanchev@bosch.io web: https://github.com/alstanchev + +vasil_vasilev: + name: Vasil Vasilev + email: vasil.vasilev@bosch.com + web: https://github.com/vvasilevbosch \ No newline at end of file diff --git a/documentation/src/main/resources/_posts/2023-10-09-ditto-benchmark.md b/documentation/src/main/resources/_posts/2023-10-09-ditto-benchmark.md new file mode 100644 index 0000000000..f077b7e116 --- /dev/null +++ b/documentation/src/main/resources/_posts/2023-10-09-ditto-benchmark.md @@ -0,0 +1,466 @@ +--- +title: "Eclipse Ditto Benchmark" +published: true +permalink: 2023-10-09-ditto-benchmark.html +layout: post +author: vasil_vasilev +tags: [blog] +hide_sidebar: true +sidebar: false +toc: true + +--- + +This blog post is presenting a benchmark of Eclipse Ditto. It consists of a few scenarios to cover most important ditto functionalities, test the performance and provide a tuning guide. This benchmark is done via the [benchmark-tool](https://github.com/eclipse/ditto/tree/master/benchmark-tool), based on [k6](https://k6.io/) load testing tool + +## Setup and used tools + +- EKS cluster using [m5.4xlarge](https://aws.amazon.com/ec2/instance-types/m5/) nodes. + +- Deployed Ditto using [ditto's official helm chart](https://github.com/eclipse-ditto/ditto/tree/master/deployment/helm), version 3.0.0, ditto version 3.3.5. + +- Deployed [bitnami](https://bitnami.com/stack/kafka/helm) kafka helm chart, version 23.0.7, kafka version 3.4.0. + +- Deployed [MMock](https://github.com/jmartin82/mmock) instance - used as a 'dummy' receiver for ditto events and device commands. + +- [k6](https://k6.io/) - load testing tool, used to implement the benchmark scenarios. + +- Deployed [k6-operator](https://github.com/grafana/k6-operator) - Kubernetes operator for running distributed k6 tests. + +- [MongoDB](https://cloud.mongodb.com/) instance of type [M50](https://www.mongodb. + com/docs/atlas/manage-clusters/#nvme-considerations), version 5.0.21 + +## Scenarios + +The benchmark test consists of 4 test scenarios that can be executed independently and in parallel: + +- [READ_THINGS](#read-things) - read things via HTTP ( get things by id ) + +- [SEARCH_THINGS](#search-things) - search things via HTTP ( get things by applying search filter ) + +- [MODIFY_THINGS](#modify-things) - Modify things by sending ditto protocol kafka messages to specfic topic. Ditto kafka connection is reading from this topic and processes the messages. Ditto HTTP push connection is configured in ditto, which sends events from topic **/things/twin/events?filter=eq(topic:action,'modified')** to a monster mock endpoint, which replies with HTTP status code 204. + +- [DEVICE_LIVE_MESSAGES](#device-live-messagescommands) - Send live messages to things via HTTP. Ditto HTTP push connection is configured, which sends events from topic **/things/live/messages** to a monster mock endpoint, which replies with predefined ditto protocol message. + +## Preparation + +1 000 000 things are created with the [benchmark-tool](https://github.com/eclipse/ditto/tree/master/benchmark-tool) by sending Ditto protocol messages to the topic that a Ditto Kafka connection is configured to read from. This connection has an [implicitThingCreation mapper](connectivity-mapping.html#implicitthingcreation-mapper) configured. + +The thing template, configured in the connection mapper looks like the following: + +```json +{ + "thing": { + "thingId": "{%raw%}{{ header:device_id }}{%endraw%}", + "_policy": { + "entries": { + "DEVICE": { + "subjects": { + "nginx:ditto": { + "type": "does-not-matter" + } + }, + "resources": { + "policy:/": { + "revoke": [], + "grant": [ + "READ", + "WRITE" + ] + }, + "thing:/": { + "revoke": [], + "grant": [ + "READ", + "WRITE" + ] + }, + "message:/": { + "revoke": [], + "grant": [ + "READ", + "WRITE" + ] + } + } + } + } + }, + "definition": "org.eclipse.ditto:coffeebrewer:0.1.0", + "attributes": { + "location": "test location", + "model": "Speaking coffee machine" + }, + "features": { + "coffee-brewer": { + "properties": { + "brewed-coffees": 0 + } + } + } + } +} +``` + +Example created thing looks like the following: + +```json +{ + "thingId": "org.eclipse.ditto:test-thing-1", + "policyId": "org.eclipse.ditto:test-thing-1", + "definition": "org.eclipse.ditto:coffeebrewer:0.1.0", + "attributes": { + "location": "test location", + "model": "Speaking coffee machine" + }, + "features": { + "coffee-brewer": { + "properties": { + "brewed-coffees": 0 + } + } + } +} +``` + +## Warmup + +Before executing the scenarios, a special 'warmup' scenario is executed. It is making 'GET' requests so that entities are loaded in memory and caches are populated in order to provide optimal performance. + +Heap memory needed according to number of things: + +- 10 000 hot things ~1.8GB for things service and ~1GB for policies service. + +- 100 000 hot things - ~3.5GB for things service and ~3GB policies + +- 300 000 hot things - ~5GB for things service and ~5GB for policies service + +- 1 000 000 hot things - ~16GB for things service and ~15GB for policies service + +The size varies, depending on the size of the things and policies. + +The scenarios are run with 100 000 warmed up things. + +## Scenarios run + +All services use the following java vm options: + +`-XX:InitialHeapSize=5g` + +`-XX:MaxHeapSize=5g` + +`-XX:MaxRAMPercentage=75` + +`-XX:ActiveProcessorCount=16` + +The scenarios run for 10 minutes each, screenshots are taken from Grafana, from exposed ditto metrics. + + + +### Read things + +This scenario executes HTTP GET requests for reading things by id, like so: + +``` +${DITTO_BASE_URI}/things/org.eclipse.ditto:test-thing-1 +``` + +The tests show that with single instance of each service, it is possible to perform ~2800 reads/s. Attempting more, results in high garbage collection time and drop in performance which can be seen in the "GC" and "Gateway Traces" dashboards. + + + + + + +
{% include image.html file="blog/benchmark/read-things-scenario/3000/gateway.png" alt="Gateway" %}{% include image.html file="blog/benchmark/read-things-scenario/3000/gc.png" alt="Garbage Collection Time" %}
+ +Scaling gateway to 2 instances results in only ~3500 reads/s, because of high GC time for things service - ~5s. Scaling things service to 2 instances results in ~5600 reads/s. A maximum of 8400 reads/s was possible with 3 gateways and 2 things instances. + +| Setup | 1 instance of each service | 2 gateway, 1 instance each else | 2 gateway, 2 things, 1 instance each else | 3 gateway, 2 things, 1 instance each else | +|:--------------------------- |:-------------------------- | ------------------------------- |:----------------------------------------- |:----------------------------------------- | +| Reads/s | ~2800 | ~3500 | ~5600 | ~8400 | +| Command Processing Time(ms) | ~80 | ~2000 | ~50 | ~70 | + +### Search things + +This scenario executes HTTP GET requests for searching things by id like so: + +``` +${DITTO_BASE_URI}/search/things?filter=eq(thingId, 'org.eclipse.ditto:test-thing-1') +``` + +In this scenario, things-search service is used to perform a search query for things, based on the provided filter. + +The tests show that with single instance of each service, it is possible to perform ~1700 searches/s. Attempting more results in high garbage collection time and drop in performance, as can be seen in "GC" and "Gateway Traces" dashboards. + + + + + + +
{% include image.html file="blog/benchmark/search-things-scenario/1800/gateway.png" alt="Gateway" %}{% include image.html file="blog/benchmark/search-things-scenario/1800/gc.png" alt="Garbage Collection Time" %}
+ +With single instance of things service and 2 instances of gateway service, a maximum of ~2400 searches/s can be performed, with high GC time for things service - ~3s. Scaling things to 2 instances results in ~3400 searches/s. Scaling to 3 gateways does not show expected performance, because the nodes things pods run on have high CPU usage - ~90%. To perform more searches, nodes with more CPUs are needed. + +| Setup | 1 instance of each service | 2 gateway, 1 instance each else | 2 gateway, 2 things, 1 instance each else | 3 gateway, 2 things, 1 instance each else | +|:--------------------------- |:-------------------------- | ------------------------------- |:----------------------------------------- |:------------------------------------------| +| Searches/s | ~1700 | ~2400 | ~3400 | ~4900 | +| Command Processing Time(ms) | ~70 | ~100 | ~50 | ~100 | + +### Modify things + +This scenario sends ditto protocol kafka messages, which cause twin modifies. + +Example message: + +``` +device_id:org.eclipse.ditto:test-thing-1,ditto_message:y!{"topic":"org.eclipse.ditto/test-thing-1/things/twin/commands/modify","path":"features/coffee-brewer/properties/brewed-coffees","value":"10"} +``` + +In this scenario, connectivity service is used to create a ditto kafka connection, which reads messages from the provided topic, maps them to a ditto modify command and forwards it to things service. The things service then executes mongodb update query and generates the [thing modified event](protocol-specification-things-create-or-modify.html#event), which is pushed to the MMock service instance via an HTTP Push connection. Also, the kafka connection is configured with [qos=1](connectivity-protocol-bindings-kafka2.html#quality-of-service), which means if there is no acknowledgement that the thing is persisted, the operation will be retried. + +The HTTP Push connection looks like the following: + +```json +{ + "id": "a70c0749-261a-474b-9fb2-8fff7bd84fb4", + "name": "http-push-connection", + "connectionType": "http-push", + "connectionStatus": "open", + "uri": "http://mmock-service:80", + "sources": [], + "targets": [ + { + "address": "POST:/{%raw%}{{ thing:id }}{%endraw%}", + "topics": [ + "_/_/things/twin/events?filter=eq(topic:action,'modified')" + ], + "authorizationContext": [ + "nginx:ditto" + ], + "headerMapping": {} + } + ], + "clientCount": 1, + "failoverEnabled": true, + "validateCertificates": true, + "processorPoolSize": 1, + "specificConfig": { + "parallelism": "1" + }, + "tags": [ + "benchmark" + ] +} +``` + +The kafka connection looks like the following: + +```json +{ + "id": "4cd191cc-aabb-4965-a1b4-dfe8ae8674bc", + "name": "kafka-source", + "connectionType": "kafka", + "connectionStatus": "open", + "uri": "tcp://bitnami-kafka:9092", + "sources": [ + { + "addresses": [ + "create-update" + ], + "consumerCount": 1, + "qos": 0, + "authorizationContext": [ + "nginx:ditto" + ], + "enforcement": { + "input": "{%raw%}{{ header:device_id }}{%endraw%}", + "filters": [ + "{%raw%}{{ entity:id }}{%endraw%}" + ] + }, + "headerMapping": {}, + "payloadMapping": [ + "ditto" + ], + "replyTarget": { + "enabled": false + } + } + ], + "targets": [], + "clientCount": 1, + "failoverEnabled": true, + "validateCertificates": true, + "processorPoolSize": 1, + "specificConfig": { + "saslMechanism": "plain", + "bootstrapServers": "bitnami-kafka:9092" + }, + "mappingDefinitions": { + "ditto": { + "mappingEngine": "Ditto", + "options": { + "thingId": "{%raw%}{{ header:device_id }}{%endraw%}" + }, + "incomingConditions": { + "sampleCondition": "fn:filter(header:ditto_message,'exists')" + } + } + }, + "tags": [ + "benchmark" + ] +} +``` + +By default, the ditto kafka consumer is throttled with limit of 100 number of messages/s per consumer. This is configured by the **KAFKA_CONSUMER_THROTTLING_LIMIT** env variable. This value is changed to 1000, since with the current setup, more than 500 messages can be processed. Single connectivity instance is able to perform ~800 modifies/s. However, "Outbound Message rates" panel shows failed published messages from our HTTP Push connection, and the following is observed from our connection logs: + +``` +Ran into a failure when publishing signal: Outgoing HTTP request aborted: There are too many in-flight requests. This can have the following reasons:\na) The HTTP endpoint does not consume the messages fast enough.\nb) The client count and/or the parallelism of this connection is not configured high enough. +``` + + + + + +
{% include image.html file="blog/benchmark/modify-things-scenario/800/outbound.png" alt="Outbound messages rates" %}
+ +Increasing the connection parallelism from 1(default) to 2 solves this issue and all outbound messages are sent without fails. + +```json +... +"connectionType": "http-push", +"specificConfig": { + "parallelism": "2" + }, +... +``` + +Attempting more modifies/s does not result in better performance, the ditto kafka connection consumes at the same rate of 800 messages/s. + + + +Scaling connectivity instance and changing our connection to have **clientCount** equal to number of connectivity instances solves the GC issue. Performing 1600 modifies/s also results in high garbage collection time(~2s) for things and things-search services. This is the maximum that can be achieved with single things and things-search services. Scaling things and things-search solves the GC issue. Further scaling of connectivity results in only ~1800 modifies/s, because MongoDB's write tickets get exhausted, as can be seen in the MongoDB 'Tickets Available' metric. Scaling MongoDB to higher CPU instance(M50 General, 8 cpu) solves the issue and ~2400 modifies/s are possible. + +| Setup | 1 instance of each service | 2 connectivity, 1 instance each else | 3 connectivity, 2 things, 2 things-search, 1 instance each else | 4 connectivity, 2 things, 2 things-search, 1 instance each else | +| --------------------------- | -------------------------- | ------------------------------------ | --------------------------------------------------------------- | --------------------------------------------------------------- | +| Modifies/s | ~800 | ~1600 | ~2400 | ~3200 | +| Signal Processing Times(ms) | ~15 | ~80 | ~10 | ~20 | +| ACK Times(ms) | ~40 | ~250 | ~50 | ~100 | + +### Device live messages(commands) + +This scenario executes HTTP POST requests to ditto's [live channel](protocol-twinlive.html#live). An HTTP Push connection is subscribed for them and in turn pushes to a MMock instance that acts as a 'dummy' device receiver of live messages/commands and simply responds with pre-configured ditto response. + +The HTTP POST request looks like the following: + +``` +URL: ${DITTO_THINGS_URI}/org.eclipse.ditto:test-thing-1/inbox/messages/someSubject +Request Body: "anyMessage" +``` + +MMock pre-configured response looks like the following: + +```json +{ + "topic": "{%raw%}{{request.body.topic}}{%endraw%}", + "status": "200", + "value": "ok", + "path": "{%raw%}{{request.body.path}}{%endraw%}", + "headers": { + "correlation-id": "{%raw%}{{request.body.headers.correlation-id}}{%endraw%}", + "content-type": "application/json" + } +} +``` + +The HTTP Push connection looks like the following: + +```json +{ + "id": "a70c0749-261a-474b-9fb2-8fff7bd84fb4", + "name": "http-push-connection", + "connectionType": "http-push", + "connectionStatus": "open", + "uri": "http://mmock-service:80", + "sources": [], + "targets": [ + { + "address": "POST:/live_messages", + "topics": [ + "_/_/things/live/messages" + ], + "authorizationContext": [ + "nginx:ditto" + ], + "headerMapping": {} + } + ], + "clientCount": 1, + "failoverEnabled": true, + "validateCertificates": true, + "processorPoolSize": 1, + "specificConfig": { + "parallelism": "1" + }, + "tags": [ + "benchmark" + ] +} +``` + +The tests show that a single connectivity instance is able to perform ~600 live messages/s. Attempting more results in high garbage collection time and high Command Processing Time, as can be seen in "GC" and "Gateway Traces" dashboards. + + + + + + +
{% include image.html file="blog/benchmark/device-live-messages-scenario/800/gateway.png" alt="Gateway" %}{% include image.html file="blog/benchmark/device-live-messages-scenario/800/gc.png" alt="Garbage Collection Time" %}
+ +Scaling the instances and increasing the value of connection **clientCount** solves the GC issue and doubles the messages/s. Performing more than 2400 live messages/s also results in high garbage collection time for things and gateway services and this is the maximum with single things and gateway services. Scaling things and gateway solves the issue and results in ~3000 live messages/s. + +| Setup | 1 instance of each service | 2 connectivity, 1 instance each else | 3 connectivity, 1 instance each else | 4 connectivity, 1 instance each else | 5 connectivity, 2 things, 2 gateway, 1 instance each else | +| --------------------------- | -------------------------- | ------------------------------------ | ------------------------------------ | ------------------------------------ | --------------------------------------------------------- | +| Live messages/s | ~600 | ~1200 | ~1800 | ~2400 | ~3000 | +| Command Processing Time(ms) | ~50 | ~50 | ~100 | ~240 | ~50 | +| Signal Processing Times(ms) | ~15 | ~15 | ~40 | ~90 | ~20 | + +## Additional tuning properties, not used in the scenarios + +Ditto 'tuning' environment variables, that were not mentioned, because those cases were never hit, while executing the scenarios. + +**THING_PERSISTENCE_ACTOR_MAILBOX_SIZE** - Used for special actor mailbox, which handles ThingModifyCommands, see org. +eclipse.ditto.things.service.persistence.actors.ThingPersistenceActorMailbox. If number of messages is more than +the mailbox capacity, modify command results in error, like the following: + +``` +Too many modifying requests are already outstanding to the Thing with ID 'org.eclipse.ditto:test-thing-1'. +``` + +**REMOTE_OUTBOUND_MESSAGE_QUEUE_SIZE** - See [https://doc.akka.io/docs/akka/current/general/configuration-reference.html](https://doc.akka.io/docs/akka/current/general/configuration-reference.html) + +```none + # Size of the send queue for outgoing messages. Messages will be dropped if + # the queue becomes full. This may happen if you send a burst of many messages + # without end-to-end flow control. Note that there is one such queue per + # outbound association. The trade-off of using a larger queue size is that + # it consumes more memory, since the queue is based on preallocated array with + # fixed size. + outbound-message-queue-size = 3072 +``` + +If this limit is hit, the following error log will appear: + +``` +Message [org.eclipse.ditto.things.model.signals.commands.modify.ModifyAttribute] from Actor +[akka://ditto-cluster/temp/thingProxy$AsP9C] to Actor[akka://ditto-cluster@10.0.157. +154:2552/system/sharding/thing#362290016] was dropped. Due to overflow of send queue, size [3072]. [10] dead letters +encountered, no more dead letters will be logged in next [5.000 min]. This logging can be turned off or adjusted +with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. +``` \ No newline at end of file diff --git a/documentation/src/main/resources/images/blog/benchmark/device-live-messages-scenario/800/gateway.png b/documentation/src/main/resources/images/blog/benchmark/device-live-messages-scenario/800/gateway.png new file mode 100644 index 0000000000..1d74a2f355 Binary files /dev/null and b/documentation/src/main/resources/images/blog/benchmark/device-live-messages-scenario/800/gateway.png differ diff --git a/documentation/src/main/resources/images/blog/benchmark/device-live-messages-scenario/800/gc.png b/documentation/src/main/resources/images/blog/benchmark/device-live-messages-scenario/800/gc.png new file mode 100644 index 0000000000..5bea061a9d Binary files /dev/null and b/documentation/src/main/resources/images/blog/benchmark/device-live-messages-scenario/800/gc.png differ diff --git a/documentation/src/main/resources/images/blog/benchmark/modify-things-scenario/800/outbound.png b/documentation/src/main/resources/images/blog/benchmark/modify-things-scenario/800/outbound.png new file mode 100644 index 0000000000..d5ac5e570e Binary files /dev/null and b/documentation/src/main/resources/images/blog/benchmark/modify-things-scenario/800/outbound.png differ diff --git a/documentation/src/main/resources/images/blog/benchmark/read-things-scenario/3000/gateway.png b/documentation/src/main/resources/images/blog/benchmark/read-things-scenario/3000/gateway.png new file mode 100644 index 0000000000..71b16c3376 Binary files /dev/null and b/documentation/src/main/resources/images/blog/benchmark/read-things-scenario/3000/gateway.png differ diff --git a/documentation/src/main/resources/images/blog/benchmark/read-things-scenario/3000/gc.png b/documentation/src/main/resources/images/blog/benchmark/read-things-scenario/3000/gc.png new file mode 100644 index 0000000000..4960828836 Binary files /dev/null and b/documentation/src/main/resources/images/blog/benchmark/read-things-scenario/3000/gc.png differ diff --git a/documentation/src/main/resources/images/blog/benchmark/search-things-scenario/1800/gateway.png b/documentation/src/main/resources/images/blog/benchmark/search-things-scenario/1800/gateway.png new file mode 100644 index 0000000000..df85fd48da Binary files /dev/null and b/documentation/src/main/resources/images/blog/benchmark/search-things-scenario/1800/gateway.png differ diff --git a/documentation/src/main/resources/images/blog/benchmark/search-things-scenario/1800/gc.png b/documentation/src/main/resources/images/blog/benchmark/search-things-scenario/1800/gc.png new file mode 100644 index 0000000000..543e6a5459 Binary files /dev/null and b/documentation/src/main/resources/images/blog/benchmark/search-things-scenario/1800/gc.png differ