From 53dc5652cd5436183bc1b656a87b601c1de6c042 Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Thu, 16 May 2024 02:00:33 +0000 Subject: [PATCH] Drop InfluxDB support to ease maintenance. Other long term storage options for Prometheus now exist, include Cortex (https://cortexmetrics.io/docs/api/). Backfilling is complex, so the suggested migration path is to use Prometheus remote_read to read both legacy InfluxDB data and live data from a replacement store (like Cortex). --- README.rst | 2 +- clib/clib_mininet_test_main.py | 1 - debian/control | 4 +- docker-compose.yaml | 15 - .../faucet_instrumentation.json | 7 +- .../grafana-dashboards/faucet_inventory.json | 5 +- .../faucet_port_statistics.json | 6 +- docs/installation.rst | 18 +- docs/intro.rst | 2 +- etc/faucet/gauge.yaml | 9 - faucet/gauge_influx.py | 230 ----------- faucet/watcher.py | 8 - faucet/watcher_conf.py | 44 +-- requirements.txt | 1 - tests/integration/mininet_tests.py | 366 ------------------ tests/unit/gauge/test_gauge.py | 312 +-------------- 16 files changed, 12 insertions(+), 1018 deletions(-) delete mode 100644 faucet/gauge_influx.py diff --git a/README.rst b/README.rst index 5670fe1b0b..05f6ad7ee6 100644 --- a/README.rst +++ b/README.rst @@ -21,7 +21,7 @@ It supports: - ACLs matching layer 2 and layer 3 fields - IPv4 and IPv6 routing, static and via BGP - Policy based forwarding to offload to external NFV applications (Eg 802.1x via hostapd, DHCP to isc DHCPD) -- Port and flow statistics via InfluxDB/Grafana +- Port and flow statistics via Grafana - Controller health and statistics via Prometheus - Unit and systems tests run under GitHub workflows based on mininet and OVS diff --git a/clib/clib_mininet_test_main.py b/clib/clib_mininet_test_main.py index 2e0d2dc4aa..2b68ed093b 100755 --- a/clib/clib_mininet_test_main.py +++ b/clib/clib_mininet_test_main.py @@ -83,7 +83,6 @@ ("lsof", ["-v"], r"lsof version", r"revision: (\d+\.\d+(\.\d+)?)\n", "4.86"), ("mn", ["--version"], r"\d+\.\d+.\d+", r"(\d+\.\d+).\d+", "2.2"), ("exabgp", ["--version"], "ExaBGP", r"ExaBGP : (\d+\.\d+).\d+", "4.0"), - ("pip3", ["show", "influxdb"], "influxdb", r"Version:\s+(\d+\.\d+)\.\d+", "3.0"), ("curl", ["--version"], "libcurl", r"curl (\d+\.\d+).\d+", "7.3"), ("ladvd", ["-h"], "ladvd", r"ladvd version (\d+\.\d+)\.\d+", "0.9"), ("iperf", ["--version"], "iperf", r"iperf version (\d+\.\d+)\.\d+", "2.0"), diff --git a/debian/control b/debian/control index 9909b8f946..a5e9b9bfe4 100644 --- a/debian/control +++ b/debian/control @@ -10,7 +10,6 @@ Build-Depends: debhelper (>= 11), python3-setuptools, python3-pbr (>= 1.9), python3-bitstring, - python3-influxdb, python3-networkx, python3-prometheus-client, python3-ruamel.yaml, @@ -26,8 +25,7 @@ Vcs-Browser: https://github.com/faucetsdn/faucet Package: python3-faucet Architecture: all -Depends: python3-influxdb (>= 2.12.0), - python3-networkx (>= 1.9), +Depends: python3-networkx (>= 1.9), python3-pbr (>= 1.9), python3-prometheus-client (>= 0.20.0), python3-prometheus-client (<< 0.20.1), python3-ruamel.yaml (>= 0.18.6), python3-ruamel.yaml (<< 0.18.7), diff --git a/docker-compose.yaml b/docker-compose.yaml index f90665a281..99a48e812a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,19 +1,6 @@ --- version: '2' services: - influxdb: - restart: always - image: 'influxdb:2.7' - ports: - - '8086' - - '8083' - volumes: - - '${FAUCET_PREFIX}/opt/influxdb/shared/data/db:/var/lib/influxdb' - environment: - INFLUXDB_DB: 'faucet' - INFLUXDB_HTTP_AUTH_ENABLED: 'true' - INFLUXDB_ADMIN_USER: 'faucet' - INFLUXDB_ADMIN_PASSWORD: 'faucet' prometheus: restart: always @@ -38,7 +25,6 @@ services: volumes: - '${FAUCET_PREFIX}/opt/grafana:/var/lib/grafana' links: - - influxdb - prometheus gauge: @@ -56,7 +42,6 @@ services: - '6654:6653' - '9303' links: - - influxdb faucet: restart: always diff --git a/docs/_static/grafana-dashboards/faucet_instrumentation.json b/docs/_static/grafana-dashboards/faucet_instrumentation.json index 7f1b63158e..6da3687375 100644 --- a/docs/_static/grafana-dashboards/faucet_instrumentation.json +++ b/docs/_static/grafana-dashboards/faucet_instrumentation.json @@ -402,7 +402,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "expr": "process_resident_memory_bytes{job=~\"faucet|gauge\"}", "format": "time_series", "groupBy": [ @@ -535,7 +534,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "expr": "process_virtual_memory_bytes{job=~\"faucet|gauge\"}", "format": "time_series", "groupBy": [ @@ -689,7 +687,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "expr": "faucet_config_reload_requests_total", "format": "time_series", "groupBy": [ @@ -817,7 +814,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "expr": "faucet_config_reload_cold_total", "format": "time_series", "groupBy": [ @@ -945,7 +941,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "expr": "faucet_config_reload_warm_total", "format": "time_series", "groupBy": [ @@ -1032,4 +1027,4 @@ "uid": "faucet-instrumentation", "version": 1, "weekStart": "" -} \ No newline at end of file +} diff --git a/docs/_static/grafana-dashboards/faucet_inventory.json b/docs/_static/grafana-dashboards/faucet_inventory.json index 146d77ae2a..43e844d26a 100644 --- a/docs/_static/grafana-dashboards/faucet_inventory.json +++ b/docs/_static/grafana-dashboards/faucet_inventory.json @@ -564,7 +564,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "expr": "of_dp_desc_stats", "format": "table", "groupBy": [ @@ -681,7 +680,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "editorMode": "code", "exemplar": false, "expr": "faucet_pbr_version", @@ -932,7 +930,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "expr": "python_info", "format": "table", "groupBy": [ @@ -1030,4 +1027,4 @@ "uid": "faucet-inventory", "version": 3, "weekStart": "" -} \ No newline at end of file +} diff --git a/docs/_static/grafana-dashboards/faucet_port_statistics.json b/docs/_static/grafana-dashboards/faucet_port_statistics.json index 37fe3b2ae3..49f51ea11b 100644 --- a/docs/_static/grafana-dashboards/faucet_port_statistics.json +++ b/docs/_static/grafana-dashboards/faucet_port_statistics.json @@ -162,7 +162,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "editorMode": "builder", "expr": "rate(of_port_rx_bytes{dp_name=~\"$dp_name\", port=~\"$port\"}[1m]) * 8", "format": "time_series", @@ -322,7 +321,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "editorMode": "builder", "expr": "rate(of_port_rx_packets{dp_name=~\"$dp_name\", port=~\"$port\"}[1m])", "format": "time_series", @@ -482,7 +480,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "editorMode": "code", "expr": "rate(of_port_rx_dropped{dp_name=~\"$dp_name\", port=~\"$port\"}[1m])", "format": "time_series", @@ -629,7 +626,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "dsType": "influxdb", "editorMode": "builder", "expr": "rate(of_port_tx_errors{dp_name=~\"$dp_name\", port=~\"$port\"}[1m])", "format": "time_series", @@ -772,4 +768,4 @@ "uid": "faucet-port-stats", "version": 9, "weekStart": "" -} \ No newline at end of file +} diff --git a/docs/installation.rst b/docs/installation.rst index 5b04a9a6c3..ba1d2fc237 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -197,7 +197,7 @@ Docker compose ~~~~~~~~~~~~~~ This is an example docker-compose file that can be used to set up gauge to talk -to Prometheus and InfluxDB with a Grafana instance for dashboards and visualisations. +to Prometheus and a Grafana instance for dashboards and visualisations. It can be run with: @@ -207,7 +207,7 @@ It can be run with: docker-compose up The time-series databases with the default settings will write to -``/opt/prometheus/`` ``/opt/influxdb/shared/data/db`` you can edit these locations +``/opt/prometheus/`` you can edit these locations by modifying the ``docker-compose.yaml`` file. On OSX, some of the default shared paths are not accessible, so to overwrite @@ -220,7 +220,7 @@ For example: export FAUCET_PREFIX=/opt/faucet When all the docker containers are running we will need to configure Grafana to -talk to Prometheus and InfluxDB. First login to the Grafana web interface on +talk to Prometheus. First login to the Grafana web interface on port 3000 (e.g http://localhost:3000) using the default credentials of ``admin:admin``. @@ -232,18 +232,6 @@ Then add two data sources. Use the following settings for prometheus: Type: Prometheus Url: http://prometheus:9090 -And the following settings for InfluxDB: - -:: - - Name: InfluxDB - Type: InfluxDB - Url: http://influxdb:8086 - With Credentials: true - Database: faucet - User: faucet - Password: faucet - Check the connection using test connection. From here you can add a new dashboard and a graphs for pulling data from the diff --git a/docs/intro.rst b/docs/intro.rst index 53d7745ff8..78977d6bcc 100644 --- a/docs/intro.rst +++ b/docs/intro.rst @@ -25,7 +25,7 @@ e.g. learned hosts, via Prometheus (so that an open source NMS such as Grafana graph it). Gauge also has an OpenFlow connection to the switch and monitors port and flow -state (exporting it to Prometheus or InfluxDB, or even flat text log files). +state (exporting it to Prometheus, or even flat text log files). Gauge, however, does not ever modify the switch's state, so that switch monitoring functions can be upgraded, restarted, without impacting forwarding. diff --git a/etc/faucet/gauge.yaml b/etc/faucet/gauge.yaml index 8324fbd52a..68efc7dcf4 100644 --- a/etc/faucet/gauge.yaml +++ b/etc/faucet/gauge.yaml @@ -14,7 +14,6 @@ watchers: # dps: ['sw1', 'sw2'] interval: 10 db: 'prometheus' - # db: 'influx' flow_table_poller: type: 'flow_table' all_dps: true @@ -29,11 +28,3 @@ dbs: type: 'text' compress: true path: 'flow_tables' - influx: - type: 'influx' - influx_db: 'faucet' - influx_host: 'influxdb' - influx_port: 8086 - influx_user: 'faucet' - influx_pwd: 'faucet' - influx_timeout: 10 diff --git a/faucet/gauge_influx.py b/faucet/gauge_influx.py deleted file mode 100644 index f6258a4c44..0000000000 --- a/faucet/gauge_influx.py +++ /dev/null @@ -1,230 +0,0 @@ -"""Library for interacting with InfluxDB.""" - -# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd. -# Copyright (C) 2015--2019 The Contributors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from influxdb import InfluxDBClient -from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError -import requests # pytype: disable=pyi-error -from faucet.gauge_pollers import ( - GaugePortStatePoller, - GaugeFlowTablePoller, - GaugePortStatsPoller, -) - - -class InfluxShipper: - """Convenience class for shipping values to InfluxDB. - - Inheritors must have a WatcherConf object as conf. - """ - - conf = None - ship_error_prefix = "error shipping points: " - logger = None - - def ship_points(self, points): - """Make a connection to InfluxDB and ship points.""" - - if self.conf is not None: - try: - client = InfluxDBClient( - host=self.conf.influx_host, - port=self.conf.influx_port, - username=self.conf.influx_user, - password=self.conf.influx_pwd, - database=self.conf.influx_db, - timeout=self.conf.influx_timeout, - ) - if client: - if client.write_points(points=points, time_precision="s"): - return True - self.logger.warning( - "%s failed to update InfluxDB" % self.ship_error_prefix - ) - else: - self.logger.warning( - "%s error connecting to InfluxDB" % self.ship_error_prefix - ) - except ( - requests.exceptions.ConnectionError, - requests.exceptions.ReadTimeout, - InfluxDBClientError, - InfluxDBServerError, - ) as err: - self.logger.warning("%s %s" % (self.ship_error_prefix, err)) - return False - - @staticmethod - def make_point(tags, rcv_time, stat_name, stat_val): - """Make an InfluxDB point.""" - # InfluxDB has only one integer type, int64. We are logging OF - # stats that are uint64. Use float64 to prevent an overflow. - # q.v. https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_reference/ - point = { - "measurement": stat_name, - "tags": tags, - "time": int(rcv_time), - # pylint: disable=no-member - "fields": {"value": float(stat_val)}, - } - return point - - def make_port_point( - self, dp_name, port_name, rcv_time, stat_name, stat_val - ): # pylint: disable=too-many-arguments - """Make an InfluxDB point about a port measurement.""" - port_tags = { - "dp_name": dp_name, - "port_name": port_name, - } - return self.make_point(port_tags, rcv_time, stat_name, stat_val) - - -class GaugePortStateInfluxDBLogger(GaugePortStatePoller, InfluxShipper): - """ - - Example: - :: - - > use faucet - Using database faucet - > precision rfc3339 - > select * from port_state_reason where port_name = 'port1.0.1' order by time desc limit 10; - name: port_state_reason - ----------------------- - time dp_name port_name value - 2017-02-21T02:12:29Z windscale-faucet-1 port1.0.1 2 - 2017-02-21T02:12:25Z windscale-faucet-1 port1.0.1 2 - 2016-07-27T22:05:08Z windscale-faucet-1 port1.0.1 2 - 2016-05-25T04:33:00Z windscale-faucet-1 port1.0.1 2 - 2016-05-25T04:32:57Z windscale-faucet-1 port1.0.1 2 - 2016-05-25T04:31:21Z windscale-faucet-1 port1.0.1 2 - 2016-05-25T04:31:18Z windscale-faucet-1 port1.0.1 2 - 2016-05-25T04:27:07Z windscale-faucet-1 port1.0.1 2 - 2016-05-25T04:27:04Z windscale-faucet-1 port1.0.1 2 - 2016-05-25T04:24:53Z windscale-faucet-1 port1.0.1 2 - - """ - - def _update(self, rcv_time, msg): - reason = msg.reason - port_no = msg.desc.port_no - if port_no in self.dp.ports: - port_name = self.dp.ports[port_no].name - points = [ - self.make_port_point( - self.dp.name, port_name, rcv_time, "port_state_reason", reason - ) - ] - self.ship_points(points) - - def send_req(self): - """Send a stats request to a datapath.""" - raise NotImplementedError # pragma: no cover - - def no_response(self): - """Called when a polling cycle passes without receiving a response.""" - raise NotImplementedError # pragma: no cover - - -class GaugePortStatsInfluxDBLogger(GaugePortStatsPoller, InfluxShipper): - """Periodically sends a port stats request to the datapath and parses \ - and outputs the response. - -Example: - :: - - > use faucet - Using database faucet - > show measurements - name: measurements - ------------------ - bytes_in - bytes_out - dropped_in - dropped_out - errors_in - packets_in - packets_out - port_state_reason - > precision rfc3339 - > select * from packets_out where port_name = 'port1.0.1' order by time desc limit 10; - name: packets_out - ----------------- - time dp_name port_name value - 2017-03-06T05:21:42Z windscale-faucet-1 port1.0.1 76083431 - 2017-03-06T05:21:33Z windscale-faucet-1 port1.0.1 76081172 - 2017-03-06T05:21:22Z windscale-faucet-1 port1.0.1 76078727 - 2017-03-06T05:21:12Z windscale-faucet-1 port1.0.1 76076612 - 2017-03-06T05:21:02Z windscale-faucet-1 port1.0.1 76074546 - 2017-03-06T05:20:52Z windscale-faucet-1 port1.0.1 76072730 - 2017-03-06T05:20:42Z windscale-faucet-1 port1.0.1 76070528 - 2017-03-06T05:20:32Z windscale-faucet-1 port1.0.1 76068211 - 2017-03-06T05:20:22Z windscale-faucet-1 port1.0.1 76065982 - 2017-03-06T05:20:12Z windscale-faucet-1 port1.0.1 76063941 - """ - - def _update(self, rcv_time, msg): - points = [] - for stat in msg.body: - port_name = str(stat.port_no) - for stat_name, stat_val in self._format_stat_pairs("_", stat): - points.append( - self.make_port_point( - self.dp.name, port_name, rcv_time, stat_name, stat_val - ) - ) - self.ship_points(points) - - -class GaugeFlowTableInfluxDBLogger(GaugeFlowTablePoller, InfluxShipper): - # pylint: disable=line-too-long - """ - - Example: - :: - - > use faucet - Using database faucet - > show series where table_id = '0' and in_port = '2' - key - --- - flow_byte_count,dp_name=windscale-faucet-1,eth_type=2048,in_port=2,ip_proto=17,priority=9099,table_id=0,udp_dst=53 - flow_byte_count,dp_name=windscale-faucet-1,eth_type=2048,in_port=2,ip_proto=6,priority=9098,table_id=0,tcp_dst=53 - flow_byte_count,dp_name=windscale-faucet-1,in_port=2,priority=9097,table_id=0 - flow_packet_count,dp_name=windscale-faucet-1,eth_type=2048,in_port=2,ip_proto=17,priority=9099,table_id=0,udp_dst=53 - flow_packet_count,dp_name=windscale-faucet-1,eth_type=2048,in_port=2,ip_proto=6,priority=9098,table_id=0,tcp_dst=53 - flow_packet_count,dp_name=windscale-faucet-1,in_port=2,priority=9097,table_id=0 - > select * from flow_byte_count where table_id = '0' and in_port = '2' and ip_proto = '17' and time > now() - 5m - name: flow_byte_count - time arp_tpa dp_name eth_dst eth_src eth_type icmpv6_type in_port ip_proto ipv4_dst ipv6_dst priority table_id tcp_dst udp_dst value vlan_vid - ---- ------- ------- ------- ------- -------- ----------- ------- -------- -------- -------- -------- -------- ------- ------- ----- -------- - 1501154797000000000 windscale-faucet-1 2048 2 17 9099 0 53 9414 - 1501154857000000000 windscale-faucet-1 2048 2 17 9099 0 53 10554 - 1501154917000000000 windscale-faucet-1 2048 2 17 9099 0 53 10554 - 1501154977000000000 windscale-faucet-1 2048 2 17 9099 0 53 12164 - 1501155037000000000 windscale-faucet-1 2048 2 17 9099 0 53 12239 - """ # noqa: E501 - - def _update(self, rcv_time, msg): - points = [] - jsondict = msg.to_jsondict() - for stats_reply in jsondict["OFPFlowStatsReply"]["body"]: - stats = stats_reply["OFPFlowStats"] - for var, tags, count in self._parse_flow_stats(stats): - points.append(self.make_point(tags, rcv_time, var, count)) - self.ship_points(points) diff --git a/faucet/watcher.py b/faucet/watcher.py index 8c4c57ec54..226cd34eee 100644 --- a/faucet/watcher.py +++ b/faucet/watcher.py @@ -27,11 +27,6 @@ from faucet.conf import InvalidConfigError from faucet.valve_util import dpid_log -from faucet.gauge_influx import ( - GaugePortStateInfluxDBLogger, - GaugePortStatsInfluxDBLogger, - GaugeFlowTableInfluxDBLogger, -) from faucet.gauge_pollers import ( GaugePortStatePoller, GaugePortStatsPoller, @@ -56,17 +51,14 @@ def watcher_factory(conf): watcher_types = { "port_state": { "text": GaugePortStateLogger, - "influx": GaugePortStateInfluxDBLogger, "prometheus": GaugePortStatePrometheusPoller, }, "port_stats": { "text": GaugePortStatsLogger, - "influx": GaugePortStatsInfluxDBLogger, "prometheus": GaugePortStatsPrometheusPoller, }, "flow_table": { "text": GaugeFlowTableLogger, - "influx": GaugeFlowTableInfluxDBLogger, "prometheus": GaugeFlowTablePrometheusPoller, }, "meter_stats": { diff --git a/faucet/watcher_conf.py b/faucet/watcher_conf.py index 14223a107f..ace7d221a2 100644 --- a/faucet/watcher_conf.py +++ b/faucet/watcher_conf.py @@ -47,8 +47,8 @@ class WatcherConf(Conf): The following elements can be configured for each db, at the level of /dbs//: - * type (string): the type of db. The available types are 'text' and 'influx' \ - for port_state, 'text', 'influx'and 'prometheus' for port_stats and \ + * type (string): the type of db. The available types are 'text' \ + for port_state, 'text', and 'prometheus' for port_stats and \ 'text' and flow_table. The following config elements then depend on the type. @@ -59,19 +59,6 @@ class WatcherConf(Conf): muiltiple files * compress (bool): compress (with gzip) flow_table output while writing it -For influx: - * influx_db (str): The name of the influxdb database. Defaults to 'faucet'. - * influx_host (str): The host where the influxdb is reachable. Defaults to \ - 'localhost'. - * influx_port (int): The port that the influxdb host will listen on. Defaults \ - to 8086. - * influx_user (str): The username for accessing influxdb. Defaults to ''. - * influx_pwd (str): The password for accessing influxdb. Defaults to ''. - * influx_timeout (int): The timeout in seconds for connecting to influxdb. \ - Defaults to 10. - * influx_retries (int): The number of times to retry connecting to influxdb \ - after failure. Defaults to 3. - For Prometheus: * prometheus_port (int): The port used to export prometheus data. Defaults to \ 9303. @@ -85,19 +72,6 @@ class WatcherConf(Conf): "path": None, "compress": False, # compress flow table file - "influx_db": "faucet", - # influx database name - "influx_host": "localhost", - # influx database location - "influx_port": 8086, - "influx_user": "", - # influx username - "influx_pwd": "", - # influx password - "influx_timeout": 10, - # timeout on influx requests - "influx_retries": 3, - # attempts to retry influx request # prometheus config "prometheus_port": 9303, "prometheus_addr": "0.0.0.0", @@ -109,13 +83,6 @@ class WatcherConf(Conf): "file": str, "path": str, "compress": bool, - "influx_db": str, - "influx_host": str, - "influx_port": int, - "influx_user": str, - "influx_pwd": str, - "influx_timeout": int, - "influx_retries": int, "prometheus_port": int, "prometheus_addr": str, "prometheus_test_thread": bool, @@ -155,13 +122,6 @@ def __init__(self, _id, dp_id, conf, prom_client): self.compress = None self.file = None self.path = None - self.influx_db = None - self.influx_host = None - self.influx_port = None - self.influx_user = None - self.influx_pwd = None - self.influx_timeout = None - self.influx_retries = None self.name = None self.prometheus_port = None self.prometheus_addr = None diff --git a/requirements.txt b/requirements.txt index 4901b3d27f..8461e6dceb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ https://github.com/faucetsdn/python3-fakencclient/archive/main.tar.gz#egg=ncclient chewie==0.0.25 eventlet>=0.34.3; python_version >= "3.12" -influxdb>=2.12.0 networkx>=1.9 pbr>=1.9 prometheus_client==0.20.0 diff --git a/tests/integration/mininet_tests.py b/tests/integration/mininet_tests.py index a1d11d9ceb..aaffcd3d68 100644 --- a/tests/integration/mininet_tests.py +++ b/tests/integration/mininet_tests.py @@ -68,40 +68,6 @@ """ -class QuietHTTPServer(HTTPServer): - allow_reuse_address = True - timeout = None - - @staticmethod - def handle_error(_request, _client_address): - return - - -class PostHandler(SimpleHTTPRequestHandler): - def log_message(self, format, *args): # pylint: disable=redefined-builtin - return - - def _log_post(self): - content_len = int(self.headers.get("content-length", 0)) - content = self.rfile.read(content_len).decode().strip() - if content and hasattr(self.server, "influx_log"): - with open(self.server.influx_log, "a", encoding="utf-8") as influx_log: - influx_log.write(content + "\n") - - -class InfluxPostHandler(PostHandler): - def do_POST(self): # pylint: disable=invalid-name - self._log_post() - return self.send_response(204) - - -class SlowInfluxPostHandler(PostHandler): - def do_POST(self): # pylint: disable=invalid-name - self._log_post() - time.sleep(self.server.timeout * 3) - return self.send_response(500) - - class FaucetTest(mininet_test_base.FaucetTestBase): pass @@ -2499,338 +2465,6 @@ def test_untagged(self): self.fail(msg="Gauge Prometheus flow counters not increasing") -class FaucetUntaggedInfluxTest(FaucetUntaggedTest): - """Basic untagged VLAN test with Influx.""" - - GAUGE_CONFIG_DBS = ( - """ - influx: - type: 'influx' - influx_db: 'faucet' - influx_host: '127.0.0.1' - influx_port: %(gauge_influx_port)d - influx_user: 'faucet' - influx_pwd: '' - influx_retries: 1 -""" - + """ - influx_timeout: %u -""" - % FaucetUntaggedTest.DB_TIMEOUT - ) - config_ports = {"gauge_influx_port": None} - influx_log = None - server_thread = None - server = None - - def get_gauge_watcher_config(self): - return """ - port_stats: - dps: ['%s'] - type: 'port_stats' - interval: 2 - db: 'influx' - port_state: - dps: ['%s'] - type: 'port_state' - interval: 2 - db: 'influx' - flow_table: - dps: ['%s'] - type: 'flow_table' - interval: 2 - db: 'influx' -""" % ( - self.DP_NAME, - self.DP_NAME, - self.DP_NAME, - ) - - def setup_influx(self): - self.influx_log = os.path.join(self.tmpdir, "influx.log") - if self.server: - self.server.influx_log = self.influx_log - self.server.timeout = self.DB_TIMEOUT - - def setUp(self): - self.handler = InfluxPostHandler - super().setUp() - self.setup_influx() - - def tearDown(self, ignore_oferrors=False): - if self.server: - self.server.shutdown() - self.server.socket.close() - super().tearDown(ignore_oferrors=ignore_oferrors) - - def _wait_error_shipping(self, timeout=None): - if timeout is None: - timeout = self.DB_TIMEOUT * 3 * 2 - self.wait_until_matching_lines_from_gauge_log_files( - r".+error shipping.+", timeout=timeout - ) - - def _verify_influx_log(self, retries=3): - self.assertTrue(os.path.exists(self.influx_log)) - expected_vars = { - "dropped_in", - "dropped_out", - "bytes_out", - "flow_packet_count", - "errors_in", - "errors_out", - "bytes_in", - "flow_byte_count", - "port_state_reason", - "packets_in", - "packets_out", - } - - observed_vars = set() - for _ in range(retries): - with open(self.influx_log, encoding="utf-8") as influx_log: - influx_log_lines = influx_log.readlines() - for point_line in influx_log_lines: - point_fields = point_line.strip().split() - self.assertEqual(3, len(point_fields), msg=point_fields) - ts_name, value_field, _ = point_fields - value = float(value_field.split("=")[1]) - ts_name_fields = ts_name.split(",") - self.assertGreater(len(ts_name_fields), 1) - observed_vars.add(ts_name_fields[0]) - label_values = {} - for label_value in ts_name_fields[1:]: - label, value = label_value.split("=") - label_values[label] = value - if ts_name.startswith("flow"): - self.assertTrue("inst_count" in label_values, msg=point_line) - if "vlan_vid" in label_values: - self.assertEqual(int(label_values["vlan"]), int(value) ^ 0x1000) - if expected_vars == observed_vars: - break - time.sleep(1) - - self.assertEqual(expected_vars, observed_vars) - self.verify_no_exception( - self.env[self.gauge_controller.name]["GAUGE_EXCEPTION_LOG"] - ) - - def _wait_influx_log(self): - for _ in range(self.DB_TIMEOUT * 3): - if os.path.exists(self.influx_log): - return - time.sleep(1) - - def _start_gauge_check(self): - if self.server_thread: - return None - influx_port = self.config_ports["gauge_influx_port"] - try: - self.server = QuietHTTPServer( - (mininet_test_util.LOCALHOST, influx_port), self.handler - ) # pytype: disable=attribute-error - self.server.timeout = self.DB_TIMEOUT - self.server_thread = threading.Thread(target=self.server.serve_forever) - self.server_thread.daemon = True - self.server_thread.start() - return None - except socket.error as err: - return "cannot start Influx test server: %s" % err - - def test_untagged(self): - self.ping_all_when_learned() - self.hup_controller(self.gauge_controller.name) - self.flap_all_switch_ports() - self._wait_influx_log() - self._verify_influx_log() - - -class FaucetUntaggedMultiDBWatcherTest( - FaucetUntaggedInfluxTest, FaucetUntaggedPrometheusGaugeTest -): - GAUGE_CONFIG_DBS = ( - """ - prometheus: - type: 'prometheus' - prometheus_addr: '::1' - prometheus_port: %(gauge_prom_port)d - influx: - type: 'influx' - influx_db: 'faucet' - influx_host: '127.0.0.1' - influx_port: %(gauge_influx_port)d - influx_user: 'faucet' - influx_pwd: '' - influx_retries: 1 -""" - + """ - influx_timeout: %u -""" - % FaucetUntaggedTest.DB_TIMEOUT - ) - config_ports = {"gauge_prom_port": None, "gauge_influx_port": None} - - def get_gauge_watcher_config(self): - return """ - port_stats: - dps: ['%s'] - type: 'port_stats' - interval: 5 - dbs: ['prometheus', 'influx'] - port_state: - dps: ['%s'] - type: 'port_state' - interval: 5 - dbs: ['prometheus', 'influx'] - flow_table: - dps: ['%s'] - type: 'flow_table' - interval: 5 - dbs: ['prometheus', 'influx'] -""" % ( - self.DP_NAME, - self.DP_NAME, - self.DP_NAME, - ) - - @staticmethod - def test_tagged(): - return - - def test_untagged(self): - self.wait_dp_status(1, controller=self.gauge_controller.name) - self.assertTrue(self.wait_ports_updating(self.port_map.keys(), self.PORT_VARS)) - self.ping_all_when_learned() - self.hup_controller(controller=self.gauge_controller.name) - self.flap_all_switch_ports() - self._wait_influx_log() - self._verify_influx_log() - - -class FaucetUntaggedInfluxDownTest(FaucetUntaggedInfluxTest): - def _start_gauge_check(self): - return None - - def test_untagged(self): - self.ping_all_when_learned() - self._wait_error_shipping() - self.verify_no_exception( - self.env[self.gauge_controller.name]["GAUGE_EXCEPTION_LOG"] - ) - - -class FaucetUntaggedInfluxUnreachableTest(FaucetUntaggedInfluxTest): - GAUGE_CONFIG_DBS = """ - influx: - type: 'influx' - influx_db: 'faucet' - influx_host: '127.0.0.2' - influx_port: %(gauge_influx_port)d - influx_user: 'faucet' - influx_pwd: '' - influx_timeout: 2 -""" - - def _start_gauge_check(self): - return None - - def test_untagged(self): - self.gauge_controller.cmd("route add 127.0.0.2 gw 127.0.0.1 lo") - self.ping_all_when_learned() - self._wait_error_shipping() - self.verify_no_exception( - self.env[self.gauge_controller.name]["GAUGE_EXCEPTION_LOG"] - ) - - -class FaucetSingleUntaggedInfluxTooSlowTest(FaucetUntaggedInfluxTest): - def setUp(self): - self.handler = SlowInfluxPostHandler - super().setUp() - self.setup_influx() - - def test_untagged(self): - self.ping_all_when_learned() - self._wait_influx_log() - self.assertTrue(os.path.exists(self.influx_log)) - self._wait_error_shipping() - self.verify_no_exception( - self.env[self.gauge_controller.name]["GAUGE_EXCEPTION_LOG"] - ) - - -class FaucetNailedForwardingTest(FaucetUntaggedTest): - CONFIG_GLOBAL = """ -vlans: - 100: - description: "untagged" -acls: - 1: - - rule: - dl_dst: "0e:00:00:00:02:02" - actions: - output: - port: %(port_2)d - - rule: - dl_type: 0x806 - dl_dst: "ff:ff:ff:ff:ff:ff" - arp_tpa: "10.0.0.2" - actions: - output: - port: %(port_2)d - - rule: - actions: - allow: 0 - 2: - - rule: - dl_dst: "0e:00:00:00:01:01" - actions: - output: - port: %(port_1)d - - rule: - dl_type: 0x806 - dl_dst: "ff:ff:ff:ff:ff:ff" - arp_tpa: "10.0.0.1" - actions: - output: - port: %(port_1)d - - rule: - actions: - allow: 0 - 3: - - rule: - actions: - allow: 0 - 4: - - rule: - actions: - allow: 0 -""" - - CONFIG = """ - interfaces: - %(port_1)d: - native_vlan: 100 - acl_in: 1 - %(port_2)d: - native_vlan: 100 - acl_in: 2 - %(port_3)d: - native_vlan: 100 - acl_in: 3 - %(port_4)d: - native_vlan: 100 - acl_in: 4 -""" - - def test_untagged(self): - first_host, second_host = self.hosts_name_ordered()[0:2] - first_host.setMAC("0e:00:00:00:01:01") - second_host.setMAC("0e:00:00:00:02:02") - self.one_ipv4_ping(first_host, second_host.IP(), require_host_learned=False) - self.one_ipv4_ping(second_host, first_host.IP(), require_host_learned=False) - - class FaucetNailedForwardingOrderedTest(FaucetUntaggedTest): CONFIG_GLOBAL = """ vlans: diff --git a/tests/unit/gauge/test_gauge.py b/tests/unit/gauge/test_gauge.py index e31cd13d27..1c83ecf219 100755 --- a/tests/unit/gauge/test_gauge.py +++ b/tests/unit/gauge/test_gauge.py @@ -27,17 +27,10 @@ from prometheus_client import CollectorRegistry -from faucet import gauge, gauge_prom, gauge_influx, gauge_pollers, watcher, valve_util +from faucet import gauge, gauge_prom, gauge_pollers, watcher, valve_util from faucet.config_parser_util import yaml_load -class QuietHandler(BaseHTTPRequestHandler): - """Don't log requests.""" - - def log_message(self, format, *args): # pylint: disable=redefined-builtin - pass - - def create_mock_datapath(num_ports): """Mock a datapath by creating mocked datapath ports.""" @@ -230,24 +223,6 @@ def compare_flow_msg(flow_msg, flow_dict, test): test.assertEqual(getattr(flow_msg.body[0], stat_name), stat_val) -class PretendInflux(QuietHandler): - """An HTTP Handler that receives InfluxDB messages.""" - - def do_POST(self): # pylint: disable=invalid-name - """Write request contents to the HTTP server, - if there is an output file to write to.""" - - if hasattr(self.server, "output_file"): - content_length = int(self.headers["content-length"]) - data = self.rfile.read(content_length) - data = data.decode("utf-8") - with open(self.server.output_file, "w", encoding="utf-8") as log: - log.write(data) - - self.send_response(204) - self.end_headers() - - class GaugePrometheusTests(unittest.TestCase): # pytype: disable=module-attr """Tests the GaugePortStatsPrometheusPoller update method""" @@ -399,291 +374,6 @@ def test_flow_stats(self): prom_poller.update(rcv_time, msg) -class GaugeInfluxShipperTest(unittest.TestCase): # pytype: disable=module-attr - """Tests the InfluxShipper""" - - @staticmethod - def create_config_obj(port=12345): - """Create a mock config object that contains the necessary InfluxDB config""" - - conf = mock.Mock( - influx_host="localhost", - influx_port=port, - influx_user="gauge", - influx_pwd="", - influx_db="gauge", - influx_timeout=10, - ) - return conf - - def get_values(self, dict_to_unpack): - """Get all the values from a nested dictionary""" - - values = [] - for value in dict_to_unpack.values(): - if isinstance(value, dict): - values.extend(self.get_values(value)) - else: - values.append(value) - return values - - def test_ship_success(self): - """Checks that the shipper successsfully connects - to a HTTP server when the points are shipped""" - - server = None - - try: - server = start_server(PretendInflux) - shipper = gauge_influx.InfluxShipper() - shipper.conf = self.create_config_obj(server.server_port) - points = [ - {"measurement": "test_stat_name", "fields": {"value": 1}}, - ] - shipper.ship_points(points) - except (ConnectionError, ReadTimeout) as err: - self.fail("Code threw an exception: {}".format(err)) - finally: - if server: - server.socket.close() - server.shutdown() - - def test_ship_connection_err(self): - """Checks that even when there is a connection error, - there is no exception thrown""" - - try: - shipper = gauge_influx.InfluxShipper() - shipper.conf = self.create_config_obj() - shipper.logger = mock.Mock() - points = [ - {"measurement": "test_stat_name", "fields": {"value": 1}}, - ] - shipper.ship_points(points) - except (ConnectionError, ReadTimeout) as err: - self.fail("Code threw an exception: {}".format(err)) - - def test_ship_no_config(self): - """Check that no exceptions are thrown when - there is no config""" - - try: - shipper = gauge_influx.InfluxShipper() - points = [ - {"measurement": "test_stat_name", "fields": {"value": 1}}, - ] - shipper.ship_points(points) - except (ConnectionError, ReadTimeout) as err: - self.fail("Code threw an exception: {}".format(err)) - - def test_point(self): - """Checks that the points produced still have the variables given to it""" - - shipper = gauge_influx.InfluxShipper() - dp_name = "faucet-1" - port_name = "port1.0.1" - rcv_time = int(time.time()) - stat_name = "test_stat_name" - # max uint64 number - stat_val = 2**64 - 1 - - port_point = shipper.make_port_point( - dp_name, port_name, rcv_time, stat_name, stat_val - ) - values = {dp_name, port_name, rcv_time, stat_name} - port_vals = set(self.get_values(port_point)) - port_vals_stat = port_vals.difference(values) - self.assertEqual(len(port_vals_stat), 1) - self.assertAlmostEqual(port_vals_stat.pop(), stat_val) - - tags = {"dp_name": dp_name, "port_name": port_name} - point = shipper.make_point(tags, rcv_time, stat_name, stat_val) - point_vals = set(self.get_values(point)) - point_vals_stat = point_vals.difference(values) - self.assertEqual(len(point_vals_stat), 1) - self.assertAlmostEqual(point_vals_stat.pop(), stat_val) - - -class GaugeInfluxUpdateTest(unittest.TestCase): # pytype: disable=module-attr - """Test the Influx loggers update methods""" - - server = None - - def setUp(self): - """Starts up an HTTP server to mock InfluxDB. - Also opens a new temp file for the server to write to""" - - self.server = start_server(PretendInflux) - self.temp_fd, self.server.output_file = tempfile.mkstemp() - - def tearDown(self): - """Close the temp file (which should delete it) - and stop the HTTP server""" - os.close(self.temp_fd) - os.remove(self.server.output_file) - self.server.socket.close() - self.server.shutdown() - - def create_config_obj(self, datapath): - """Create a mock config object that contains the necessary InfluxDB config""" - - conf = mock.Mock( - influx_host="localhost", - influx_port=self.server.server_port, - influx_user="gauge", - influx_pwd="", - influx_db="gauge", - influx_timeout=10, - interval=5, - dp=datapath, - ) - return conf - - @staticmethod - def parse_key_value(dictionary, kv_list): - """ - When given a list consisting of strings such as: 'key1=val1', - add to the dictionary as dictionary['key1'] = 'val1'. - Ignore entries in the list which do not contain '=' - """ - for key_val in kv_list: - if "=" in key_val: - key, val = key_val.split("=") - - try: - val = float(val) - val = int(val) - except ValueError: - pass - - dictionary[key] = val - - def parse_influx_output(self, output_to_parse): - """ - Parse the output from the mock InfluxDB server - The usual layout of the output is: - measurement,tag1=val1,tag2=val2 field1=val3 timestamp - The tags are separated with a comma and the fields - are separated with a space. The measurement always - appears first, and the timestamp is always last - - """ - influx_data = dict() - - tags = output_to_parse.split(",") - fields = tags[-1].split(" ") - tags[-1] = fields[0] - influx_data["timestamp"] = int(fields[-1]) - fields = fields[1:-1] - - self.parse_key_value(influx_data, tags) - self.parse_key_value(influx_data, fields) - - return (tags[0], influx_data) - - def test_port_state(self): - """Check the update method of the GaugePortStateInfluxDBLogger class""" - - conf = self.create_config_obj(create_mock_datapath(3)) - db_logger = gauge_influx.GaugePortStateInfluxDBLogger( - conf, "__name__", mock.Mock() - ) - db_logger._running = True - - reasons = [ofproto.OFPPR_ADD, ofproto.OFPPR_DELETE, ofproto.OFPPR_MODIFY] - for i in range(1, len(conf.dp.ports) + 1): - msg = port_state_msg(conf.dp, i, reasons[i - 1]) - rcv_time = int(time.time()) - db_logger.update(rcv_time, msg) - - with open(self.server.output_file, "r", encoding="utf-8") as log: - output = log.read() - - influx_data = self.parse_influx_output(output)[1] - data = {conf.dp.name, conf.dp.ports[i].name, rcv_time, reasons[i - 1]} - self.assertEqual(data, set(influx_data.values())) - - def test_port_stats(self): - """Check the update method of the GaugePortStatsInfluxDBLogger class""" - conf = self.create_config_obj(create_mock_datapath(2)) - db_logger = gauge_influx.GaugePortStatsInfluxDBLogger( - conf, "__name__", mock.Mock() - ) - db_logger._running = True - - msg = port_stats_msg(conf.dp) - rcv_time = int(time.time()) - - db_logger.update(rcv_time, msg) - with open(self.server.output_file, "r", encoding="utf-8") as log: - output = log.readlines() - - for line in output: - measurement, influx_data = self.parse_influx_output(line) - - # get the number at the end of the port_name - port_num = influx_data["port_name"] # pytype: disable=unsupported-operands - # get the original port stat value - port_stat_val = logger_to_ofp(msg.body[port_num - 1])[ - measurement - ] # pytype: disable=unsupported-operands - - self.assertEqual(port_stat_val, influx_data["value"]) - self.assertEqual(conf.dp.name, influx_data["dp_name"]) - self.assertEqual(rcv_time, influx_data["timestamp"]) - - def test_flow_stats(self): - """Check the update method of the GaugeFlowTableInfluxDBLogger class""" - - conf = self.create_config_obj(create_mock_datapath(0)) - db_logger = gauge_influx.GaugeFlowTableInfluxDBLogger( - conf, "__name__", mock.Mock() - ) - db_logger._running = True - - rcv_time = int(time.time()) - instructions = [parser.OFPInstructionGotoTable(1)] - msg = flow_stats_msg(conf.dp, instructions) - db_logger.update(rcv_time, msg) - - other_fields = { - "dp_name": conf.dp.name, - "dp_id": hex(conf.dp.dp_id), - "timestamp": rcv_time, - "priority": msg.body[0].priority, - "table_id": msg.body[0].table_id, - "inst_count": len(msg.body[0].instructions), - "vlan": msg.body[0].match.get("vlan_vid") ^ ofproto.OFPVID_PRESENT, - "cookie": msg.body[0].cookie, - } - - with open(self.server.output_file, "r", encoding="utf-8") as log: - output = log.readlines() - - for line in output: - measurement, influx_data = self.parse_influx_output(line) - - for stat_name, stat_val in influx_data.items(): - if stat_name == "value": - if measurement == "flow_packet_count": - self.assertEqual(msg.body[0].packet_count, stat_val) - elif measurement == "flow_byte_count": - self.assertEqual(msg.body[0].byte_count, stat_val) - else: - self.fail("Unknown measurement") - - elif stat_name in other_fields: - self.assertEqual(other_fields[stat_name], stat_val) - - elif stat_name in msg.body[0].match: - self.assertEqual(msg.body[0].match.get(stat_name), stat_val) - - else: - self.fail( - "Unknown key: {} and value: {}".format(stat_name, stat_val) - ) - - class GaugeThreadPollerTest(unittest.TestCase): # pytype: disable=module-attr """Tests the methods in the GaugeThreadPoller class"""