Skip to content

Commit

Permalink
MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
ledermann committed Oct 2, 2024
1 parent de1243f commit 15fa74f
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 174 deletions.
60 changes: 56 additions & 4 deletions referenz/mqtt-collector/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,27 @@ parent: Referenz
nav_order: 3
---

# Verwendung für SOLECTRUS
# MQTT-Collector

Der **MQTT-Collector** sammelt Messwerte über einen MQTT-Broker ein und schreibt diesen in die InfluxDB.

Im Gegensatz zu den spezialisierten Collectoren (z.B. für SENEC oder Shelly) ist das ein Allzweckwerkzeug, das Messwerte aus den verschiedensten Quellen verarbeiten kann.

Voraussetzung für diesen Collector ist ein funktionsfähiger MQTT-Broker. Dieser kann entweder im lokalen Netzwerk laufen (ioBroker oder Mosquitto sind gängige MQTT-Broker) oder auch ein Cloud-Service sein (z.B. HiveMQ).

Das übliche Einsatzgebiet ist die Anbindung von Geräten an SOLECTRUS, die nicht direkt unterstützt werden. Diese werden dann z.B. von einem ioBroker-Adapter abgefragt und die Werte per MQTT bereitgestellt, wo sie dann vom MQTT-Collector abgeholt werden.

Auf diese Weise können auch exotische Wallboxen, Wärmepumpen, Batteriespeicher, Wechselrichter, E-Autos etc. in SOLECTRUS eingebunden werden.

# Empfang von Messwerten

Der MQTT-Collector abonniert Topics bei einem MQTT-Broker, verarbeitet die empfangenen Werte und schreibt sie in eine InfluxDB. Prinzipiell ist das unabhängig von SOLECTRUS, aber üblicherweise wird der Collector in SOLECTRUS-Umgebungen eingesetzt. SOLECTRUS bedient sich dann der Werte aus der InfluxDB.

Für jedes abonnierte Topic, für das der MQTT-Collector Messwerte empfängt, wird einzeln über ein **Mapping** festgelegt, was mit den Werten geschehen soll und insbesondere wohin sie gespeichert werden sollen.

```mermaid
graph LR
A[MQTT-Collector] -- Topic + Value --> B((MQTT-Mapping))
A[MQTT-Collector] -- Topic + Payload --> B((MQTT-Mapping))
B -- Measurement + Field + Value --> C[InfluxDB]
```
Expand All @@ -24,14 +36,54 @@ Auch können Messwerte, die (noch) nicht von SOLECTRUS verarbeitet werden könne

Wie das Mapping im Detail zu definieren ist, wird in der [Konfiguration](Konfiguration.md) beschrieben.

{: .note}

Es ist wichtig, dass Messwerte **kontinuierlich** über den MQTT-Broker gesendet werden. Lücken im Datenempfang führen später in SOLECTRUS zu Problemen. Gängiges Negativ-Beispiel ist eine Wallbox, die nur bei Ladevorgängen den Verbrauch sendet. Es ist sicherzustellen, dass auch bei Nicht-Ladevorgängen kontinuierlich ein Verbrauch von 0 Watt gesendet wird, so dass keine Lücken entstehen. \
\
Falls also der MQTT-Broker eine Einstellung wie "Publish only on change" anbietet, so ist diese zu **deaktivieren**.

## Weiterverarbeitung in SOLECTRUS

SOLECTRUS holt sich die Werte aus der InfluxDB, ohne zu wissen, woher sie kommen. Das SOLECTRUS-Dashboard ist also völlig unabhängig von der Quelle der Messwerte. SOLECTRUS definiert dabei Sensoren, über die die Werte abgerufen werden. Die Sensoren sind also die Schnittstelle zwischen SOLECTRUS und der InfluxDB. Für den MQTT-Collector sind die Sensoren aber irrelevant.
Das Dashboard von SOLECTRUS holt sich die Werte aus der InfluxDB, ohne zu wissen, woher sie kommen. Das SOLECTRUS-Dashboard ist also unabhängig von der Quelle der Messwerte. SOLECTRUS definiert dabei Sensoren, über die die Werte abgerufen werden. Die Sensoren sind also die Schnittstelle zwischen SOLECTRUS und der InfluxDB. Für den MQTT-Collector sind die Sensoren aber irrelevant.

```mermaid
graph LR
C[InfluxDB] -- Measurement + Field + Value --> D((Sensor-Mapping)) -- Sensor + Value --> E[Dashboard]
```

Quelltext im GitHub-Repository: \
## Protokollierung

Der Collector schreibt ein Protokoll in der Docker-Log, das im Normalfall so aussieht:

```plaintext
MQTT collector for SOLECTRUS, Version 0.3.0, built at 2024-09-13T23:14:19.973Z
https://github.com/solectrus/mqtt-collector
Copyright (c) 2023-2024 Georg Ledermann and contributors, released under the MIT License
Using Ruby 3.3.5 on platform x86_64-linux-musl
Subscribing from MQTT broker at mqtt://192.168.178.31:1883
Pushing to InfluxDB at http://influxdb, bucket PV
Subscribing to 1 topics:
- MODBUS/BatteryLevel => SUNGROW:battery_soc (float)
# Message from 2024-10-02 10:00:44 +0200
topic = MODBUS/BatteryLevel
message = 42
=> SUNGROW:battery_soc = 42.0
...
```

Das Protokoll kann über folgenden Befehl abgerufen werden:

```bash
docker logs [container-name]
```

Bei Problemen oder Fehlern (z.B. wenn der MQTT-Broker oder die InfluxDB nicht erreichbar ist) wird dies ebenfalls protokolliert. Es empfiehlt sich daher, im Zweifelsfall zuerst das Protokoll zu prüfen.

## Quelltext

Der MQTT-Collector ist in Ruby implementiert, der Quelltext ist auf GitHub verfügbar: \
[github.com/solectrus/mqtt-collector](https://github.com/solectrus/mqtt-collector)
268 changes: 98 additions & 170 deletions referenz/mqtt-collector/konfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,213 +2,141 @@
title: Konfiguration
layout: page
parent: MQTT-Collector
nav_order: 1
---

# Konfiguration über Umgebungsvariablen

SOLECTRUS wird über Umgebungsvariablen konfiguriert. Diese stehen in der Datei `.env` im gleichen Verzeichnis wie die `compose.yml`. Jeder Container hat seine eigenen Variablen, mache Variablen werden von verschiedenen Containern benutzt.

Es ist zu beachten, dass die Umgebungsvariablen nicht nur in der `.env` definiert werden, sondern auch in der `compose.yml` aufgeführt werden (als Auflistung im Abschnitt `environment` des Services `mqtt-collector`). Andernfalls sind sie für den Collector nicht erreichbar.

Nach einer Bearbeitung von `.env` oder `compose.yml` müssen die Container neu erstellt werden, um die Änderungen zu übernehmen. Dies geschieht mit dem Befehl `docker compose up -d` (bei älteren Docker-Versionen `docker-compose up -d`, also mit Bindestrich).

Es folgt eine Auflistung der für den MQTT-Collector definierten Umgebungsvariablen, gültig ab Version `v0.2.0`.

## Zugang zum MQTT-Broker
# Konfigurieren des MQTT-Collectors

Der MQTT-Collector wird üblicherweise in die Gesamtkonfiguration von SOLECTRUS integriert, d.h. die bestehenden Dateien `compose.yaml` und `.env` sind zu erweitern.

## compose.yaml

```yaml
services:
mqtt-collector:
image: ghcr.io/solectrus/mqtt-collector:latest
environment:
- TZ
- INFLUX_SCHEMA
- INFLUX_HOST
- INFLUX_TOKEN
- INFLUX_ORG
- INFLUX_BUCKET
- INFLUX_PORT
- MQTT_HOST
- MQTT_PORT
- MQTT_SSL
- MQTT_USERNAME
- MQTT_PASSWORD
- MAPPING_0_TOPIC
- MAPPING_0_JSON_KEY
- MAPPING_0_JSON_PATH
- MAPPING_0_JSON_FORMULA
- MAPPING_0_MEASUREMENT
- MAPPING_0_MEASUREMENT_POSITIVE
- MAPPING_0_MEASUREMENT_NEGATIVE
- MAPPING_0_FIELD
- MAPPING_0_FIELD_POSITIVE
- MAPPING_0_FIELD_NEGATIVE
- MAPPING_0_TYPE
- MAPPING_0_MIN
- MAPPING_0_MAX
- ... # weitere Mappings bei Bedarf
logging:
options:
max-size: 10m
max-file: '3'
restart: unless-stopped
depends_on:
influxdb:
condition: service_healthy
links:
- influxdb
labels:
- com.centurylinklabs.watchtower.scope=solectrus

influxdb:
# ...

watchtower:
# ...
```

- `MQTT_HOST`
## Umgebungsvariablen

Hostname des MQTT-Brokers. Dies kann die IP-Adresse des lokal erreichbaren ioBroker sein, aber auch die Domain eines extern erreichbaren Brokers. Darf **kein** `http://` oder `https://` enthalten!
### `MQTT_HOST`

- `MQTT_PORT`
Hostname des MQTT-Brokers. Dies kann die IP-Adresse des lokal erreichbaren ioBroker sein, aber auch die Domain eines extern erreichbaren Brokers. Darf **kein** `http://` oder `https://` enthalten!

Port des MQTT-Brokers. Meist ist das `1883`.
### `MQTT_PORT`

- `MQTT_SSL` (standardmäßig `false`)
Port des MQTT-Brokers. Meist ist das `1883`.

Wenn der MQTT-Broker über TLS abgesichert ist, muss dieser Wert auf `true` gesetzt werden. Bei einem lokalen ioBroker ist das üblicherweise nicht der Fall, die Angabe kann dann entfallen oder auf `false` gesetzt werden.
### `MQTT_SSL`

- `MQTT_USERNAME` (optional)
Wenn der MQTT-Broker über TLS abgesichert ist, muss dieser Wert auf `true` gesetzt werden. Bei einem lokalen ioBroker ist das üblicherweise nicht der Fall, die Angabe kann dann entfallen oder auf `false` gesetzt werden.

Falls erforderlich: Benutzername für den Zugriff auf den MQTT-Broker.
### `MQTT_USERNAME`

- `MQTT_PASSWORD` (optional)
Falls erforderlich: Benutzername für den Zugriff auf den MQTT-Broker.

Falls erforderlich: Passwort für den Zugriff auf den MQTT-Broker.
### `MQTT_PASSWORD`

## Zugriff auf InfluxDB
Falls erforderlich: Passwort für den Zugriff auf den MQTT-Broker.

- `INFLUX_HOST`
### `INFLUX_HOST`

Hostname des InfluxDB-Servers. Im Normalfall, wenn InfluxDB im gleichen Docker-Netzwerk läuft, ist das der Name des Containers (z.B. `influxdb`). Es kann aber auch ein externer InfluxDB-Server sein, z.B. `influxdb.example.com`.
Hostname des InfluxDB-Servers. Im Normalfall, wenn InfluxDB im gleichen Docker-Netzwerk läuft, ist das der Name des Containers (z.B. `influxdb`). Es kann aber auch ein externer InfluxDB-Server sein, z.B. `influxdb.example.com`.

- `INFLUX_SCHEMA` (standardmäßig `http`)
### `INFLUX_SCHEMA`

Schema für die Verbindung zu InfluxDB. Bei Verwendung einer externen InfluxDB, die über TLS abgesichert ist, muss dieser Wert auf `https` gesetzt werden.
Schema für die Verbindung zu InfluxDB. Bei Verwendung einer externen InfluxDB, die über TLS abgesichert ist, muss dieser Wert auf `https` gesetzt werden.

- `INFLUX_PORT` (standardmäßig `8086`)
Standardwert: `http`

Port für die Verbindung zu InfluxDB. Bei Verwendung einer externen InfluxDB könnte eine Anpassung erforderlich sein, z.B. auf `443`.
### `INFLUX_PORT`

- `INFLUX_ORG`
Port für die Verbindung zu InfluxDB.

Organisation in InfluxDB, in der die Daten gespeichert werden sollen.
Optional, Standard ist `8086`

- `INFLUX_BUCKET`
Bei Verwendung einer externen, per TLS abgesicherten InfluxDB kann z.B. `443` eingestellt werden.

Bucket in InfluxDB, in der die Daten gespeichert werden sollen.
### `INFLUX_TOKEN`

- `INFLUX_TOKEN`
Token für den Zugriff auf InfluxDB. Dieser Token muss in InfluxDB erstellt werden und die Berechtigung haben, Daten in den angegebenen Bucket zu **schreiben**.

Token für den Zugriff auf InfluxDB. Dieser Token muss in InfluxDB erstellt werden und die Berechtigung haben, Daten in den angegebenen Bucket zu **schreiben**.
### `INFLUX_ORG`

## Abonnieren von MQTT-Nachrichten
Organisation in InfluxDB, in der die Messwerte gespeichert werden sollen.

Der MQTT-Collector kann Nachrichten von verschiedenen (beliebig vielen) Topics abonnieren, verarbeiten und dann in die InfluxDB schreiben. Dazu werden Zuordnungen ("Mappings") definiert und für jedes Mapping drei Dinge festgelegt:
### `INFLUX_BUCKET`

- Wo kommt der Wert her, also welches "Topic" muss abonniert werden?
- Welche Verarbeitung ist notwendig (Vorzeichen-Behandlung, Datentyp-Konvertierung, JSON-Extraktion, Formelbildung)?
- Wohin in der InfluxDB soll der ermittelte Wert geschrieben werden (Measurement und Field)?
Bucket in InfluxDB, in der die Messwerte gespeichert werden sollen.

Jedes Mapping wird durch mehrere Umgebungsvariablen definiert, die mit dem Präfix `MAPPING_X_` beginnen, wobei `X` eine Zahl ab 0 sein sollte (was aber nicht zwingend ist). Sinnvoll, aber ebenfalls nicht zwingend ist, dass die Mappings in aufsteigender Reihenfolge definiert werden.
### Topics

### Mögliche Umgebungsvariable je Mapping
Für jedes Topic, das der MQTT-Collector abonnieren soll, muss ein Mapping definiert werden. Ein Mapping besteht aus mehreren Umgebungsvariablen, die mit dem Präfix `MAPPING_X_` beginnen, wobei `X` eine eindeutige Zahl sein muss.

Für jedes einzelne Mapping stehen verschiedene Umgebungsvariablen zur Verfügung, von denen einige optional sind:
Es stehen folgende Variablen zur Verfügung:

- `MAPPING_X_TOPIC`

Das Topic, das abonniert werden soll, z.B. `senec/0/ENERGY/GUI_INVERTER_POWER`.

- `MAPPING_X_JSON_KEY` (optional)

Falls das Topic einen JSON-Payload (mit **nicht** verschachtelten key/value-Paaren) enthält, kann hier der Schlüssel angegeben werden, aus dem der Wert extrahiert werden soll. Ein Schlüssel ist immer ein String, z.B. `inverter_power`.

- `MAPPING_X_JSON_PATH` (optional)

Falls das Topic einen komplexen (z.B. verschachtelten) JSON-Payload enthält, kann hier der [JSONPath](https://goessner.net/articles/JsonPath/) angegeben werden, um den Wert zu extrahieren. Ein JSONPath beginnt immer mit `$.`, z.B. `$.example.foo.bar[2]`.

- `MAPPING_X_JSON_FORMULA` (optional)

Falls das Topic JSON liefert, kann ein Berechnungsschritt erfolgen, um den zu speichernden Messwert zu ermitteln. Hierzu muss eine Formel angegeben werden, die [einige mathematische Operationen](https://github.com/rubysolo/dentaku?tab=readme-ov-file#built-in-operators-and-functions) enthalten darf, z.B. `round({value} * 1.5`).

Die geschweiften Klammern `{}` dienen dazu, Werte aus dem JSON-Payload zu referenzieren. Es können dabei einfache Schlüssel oder JSONPath verwendet werden.

- `MAPPING_X_JSON_KEY`
- `MAPPING_X_JSON_PATH`
- `MAPPING_X_JSON_FORMULA`
- `MAPPING_X_MEASUREMENT`

Der Name des InfluxDB-Measurement, in das der Wert geschrieben werden soll (unabhängig davon, ob er positiv oder negativ ist).

- `MAPPING_X_MEASUREMENT_POSITIVE` (optional)

Name des InfluxDB-Measurement, in das der Wert geschrieben werden soll, wenn er **positiv** ist. Andernfalls (also wenn er negativ oder `0` ist), wird `0` geschrieben.

- `MAPPING_X_MEASUREMENT_POSITIVE`
- `MAPPING_X_MEASUREMENT_NEGATIVE`

Der Name des InfluxDB-Measurement, in das der (absolute) Wert geschrieben werden soll, wenn er **negativ** ist. Andernfalls (also wenn er positiv oder `0` ist), wird `0` geschrieben.

- `MAPPING_X_FIELD`

Der Name des InfluxDB-Feldes, in das der Wert geschrieben werden soll.

- `MAPPING_X_FIELD_POSITIVE` (optional)

Name des InfluxDB-Field, in das der Wert geschrieben werden soll, wenn er **positiv** ist. Andernfalls (also wenn er negativ oder `0` ist), wird `0` geschrieben.

- `MAPPING_X_FIELD_NEGATIVE` (optional)

Name des InfluxDB-Field, in das der Wert geschrieben werden soll, wenn er **negativ** ist. Andernfalls (also wenn er positiv oder `0` ist), wird `0` geschrieben.

- `MAPPING_X_FIELD_POSITIVE`
- `MAPPING_X_FIELD_NEGATIVE`
- `MAPPING_X_TYPE`
- `MAPPING_X_MIN`
- `MAPPING_X_MAX`

Der Datentyp des Feldes. Möglich sind: `integer`, `float`, `string` oder `boolean`.

- `MAPPING_X_MIN` (optional, ab `v0.3.0`)

Untererer Grenzwert für Messwerte. Wird ein Wert unterhalb dieses Grenzwerts empfangen, wird er ignoriert und **nicht** in die InfluxDB geschrieben. Nützlich für Ausreißer oder offensichtlich fehlerhafte Werte, die sonst die Statistik verfälschen würden.
Eine ausführliche Beschreibung eines Mappings findet sich auf der folgenden Seite: \
[Abonnieren von Topics](topics).

- `MAPPING_X_MAX` (optional, ab `v0.3.0`)

Oberer Grenzwert für Messwerte. Wird ein Wert oberhalb dieses Grenzwerts empfangen, wird er ignoriert und **nicht** in die InfluxDB geschrieben. Nützlich für Ausreißer oder offensichtlich fehlerhafte Werte, die sonst die Statistik verfälschen würden.

### Beispiele

#### 1. Einfaches Mapping

Topic wird abonniert, der erhaltene Wert wird unverändert als Float in die InfluxDB geschrieben:

```env
MAPPING_0_TOPIC=senec/0/ENERGY/GUI_INVERTER_POWER
MAPPING_0_MEASUREMENT=PV
MAPPING_0_FIELD=inverter_power
MAPPING_0_TYPE=float
```

#### 2. Mapping mit Vorzeichen-Behandlung

Wenn die Werte des Topics positiv oder negativ sein können, erfolgt hier eine Aufteilung. Positive Werte werden in `grid_import_power` geschrieben, negative Werte in `grid_export_power`.

```env
MAPPING_1_TOPIC=senec/0/ENERGY/GUI_GRID_POW
MAPPING_1_MEASUREMENT_POSITIVE=PV
MAPPING_1_MEASUREMENT_NEGATIVE=PV
MAPPING_1_FIELD_POSITIVE=grid_import_power
MAPPING_1_FIELD_NEGATIVE=grid_export_power
MAPPING_1_TYPE=float
```

- Falls der empfangene Wert positiv ist (z.B. `1000`): `grid_import_power` wird auf `1000` gesetzt, `grid_export_power` auf `0`.
- Falls der empfangene Wert negativ ist (z.B. `-500`): `grid_import_power` wird auf `0` gesetzt, `grid_export_power` auf `500`.
- Falls der empfangene Wert `0` ist: `grid_import_power` und `grid_export_power` werden beide auf `0` gesetzt.

#### 3. Mapping mit einfachem JSON-Payload

Verwendung von `JSON_KEY`:

```env
MAPPING_2_TOPIC=my/little/nuclear/plant
MAPPING_2_JSON_KEY=radiation_level
MAPPING_2_MEASUREMENT=nuclear_power_plant
MAPPING_2_FIELD=radiation_level
MAPPING_2_TYPE=float
```

Aus einem JSON von beispielsweise `{"radiation_level": 90.5, "reactivity": 0.7}` resultiert der Wert `90.5`.

#### 4. Mapping mit komplexem JSON-Payload

Verwendung von `JSON_PATH`:

```env
MAPPING_3_TOPIC=go-e/ATTR
MAPPING_3_JSON_PATH=$.ccp[2]
MAPPING_3_MEASUREMENT=WALLBOX
MAPPING_3_FIELD=power
MAPPING_3_TYPE=float
```

Dies extrahiert den Wert aus einem Payload wie `{"ccp": [1,2,42,3]}`. In diesem Beispiel gibt es den Wert an der Stelle 2 (drittes Element) des Arrays `ccp` zurück, der `42` ist.

#### 5. Mapping mit Formel

```env
MAPPING_4_TOPIC=my/little/nuclear/plant
MAPPING_4_JSON_FORMULA="round({reactivity} * {radiation_level}) + 42"
MAPPING_4_MEASUREMENT=nuclear_power_plant
MAPPING_4_FIELD=danger_level
MAPPING_4_TYPE=float
```

Aus einem JSON von z.B. `{"radiation_level": 90.5, "reactivity": 0.7}` entsteht `danger_level` mit `round(0.7 * 90.5) + 42`, also `105`.

#### 6. Mapping mit Grenzwerten

```env
MAPPING_0_TOPIC=senec/0/ENERGY/GUI_INVERTER_POWER
MAPPING_0_MEASUREMENT=PV
MAPPING_0_FIELD=inverter_power
MAPPING_0_TYPE=float
MAPPING_0_MIN=5
MAPPING_0_MAX=15000
```
{: .note}

Werte unter `5` oder über `15000` werden ignoriert und nicht in die InfluxDB geschrieben.
Es ist unbedingt darauf zu achten, dass die definierten Variablen **alle** auch in der `compose.yaml` aufgeführt sind. Andernfalls sind sie für den MQTT-Collector nicht erreichbar.
Loading

0 comments on commit 15fa74f

Please sign in to comment.