From 55ca6ec9f863726b266bebffb3e74539beb1b7e6 Mon Sep 17 00:00:00 2001 From: Vince Fleming Date: Thu, 8 Aug 2024 11:36:29 -0400 Subject: [PATCH 1/8] changes for loki-only mode --- export.py | 34 ++++++++++++++++++++-------------- export.yml | 5 +++-- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/export.py b/export.py index 4d6dade..8692fe6 100644 --- a/export.py +++ b/export.py @@ -29,7 +29,7 @@ from wekalib.wekacluster import WekaCluster import wekalib.exceptions -VERSION = "1.7.3" +VERSION = "1.7.4" #VERSION = "experimental" @@ -134,6 +134,7 @@ def prom_client(config): # is there a loki server set? loki_host = config['exporter'].get('loki_host', None) loki_port = config['exporter'].get('loki_port', 3100) + loki_only = config['exporter'].get('loki_only', False) if loki_host is not None and len(loki_host) != 0: log.info(f"loki_host set to {loki_host}") @@ -144,23 +145,28 @@ def prom_client(config): else: lokiserver = None + if loki_only and lokiserver is None: + log.critical("loki_only set, but no Loki server defined in config file") + sys.exit(1) + # # Start up the server to expose the metrics. # - log.info(f"starting http server on port {config['exporter']['listen_port']}") - try: - if config['exporter']['certfile'] is not None and config['exporter']['keyfile'] is not None: - prometheus_client.start_http_server(int(config['exporter']['listen_port']), - certfile=config['exporter']['certfile'], - keyfile=config['exporter']['keyfile']) - else: - prometheus_client.start_http_server(int(config['exporter']['listen_port'])) - except Exception as exc: - log.critical(f"Unable to start http server on port {config['exporter']['listen_port']}: {exc}") - return 1 + if not loki_only: + log.info(f"starting http server on port {config['exporter']['listen_port']}") + try: + if config['exporter']['certfile'] is not None and config['exporter']['keyfile'] is not None: + prometheus_client.start_http_server(int(config['exporter']['listen_port']), + certfile=config['exporter']['certfile'], + keyfile=config['exporter']['keyfile']) + else: + prometheus_client.start_http_server(int(config['exporter']['listen_port'])) + except Exception as exc: + log.critical(f"Unable to start http server on port {config['exporter']['listen_port']}: {exc}") + return 1 - # register our custom collector - prometheus_client.REGISTRY.register(collector) + # register our custom collector + prometheus_client.REGISTRY.register(collector) while True: time.sleep(30) # sleep first, just in case we're started at the same time as Loki; give it time diff --git a/export.yml b/export.yml index 9cc5e4f..373341d 100644 --- a/export.yml +++ b/export.yml @@ -5,8 +5,9 @@ # exporter section - info about how we're going to run exporter: listen_port: 8150 - loki_host: null + loki_host: wms loki_port: 3100 + loki_only: True timeout: 10.0 max_procs: 8 max_threads_per_proc: 100 @@ -19,7 +20,7 @@ exporter: cluster: auth_token_file: auth-token.json hosts: - - 172.29.0.63 + - 172.29.0.64 force_https: False # only 3.10+ clusters support https verify_cert: False # default cert cannot be verified mgmt_port: 14000 From 24da4fbd6ac7488916b3014cf4f2a42fe7145e98 Mon Sep 17 00:00:00 2001 From: Vince Fleming Date: Thu, 8 Aug 2024 15:24:32 -0400 Subject: [PATCH 2/8] changes for loki-only mode --- collector.py | 150 +++++++++++++++++++++++++++------------------------ export.py | 2 + lokilogs.py | 13 ++++- maps.py | 27 +++++++++- 4 files changed, 120 insertions(+), 72 deletions(-) diff --git a/collector.py b/collector.py index 4fa7cad..3628bb7 100644 --- a/collector.py +++ b/collector.py @@ -106,7 +106,7 @@ def __init__(self, config, cluster_obj): # wekaCollector self.collect_time = None self.clusterdata = {} self.threaderror = False - self.api_stats = {} + self.api_stats = {'num_calls': 0} exporter = config['exporter'] self.max_procs = exporter['max_procs'] self.max_threads_per_proc = exporter['max_threads_per_proc'] @@ -347,40 +347,12 @@ def store_results(self, cluster, results): else: self.clusterdata[str(cluster)][stat] += result.result - # - # gather() gets fresh stats from the cluster as they update - # populates all datastructures with fresh data - # - # - def gather(self): - - cluster = self.cluster - self._reset_metrics() - - start_time = time.time() - log.info("gathering weka data from cluster {}".format(str(cluster))) - - # re-initialize wekadata so changes in the cluster don't leave behind strange things (hosts/nodes that no longer exist, etc) - wekadata = {} - self.clusterdata[str(cluster)] = wekadata # clear out old data - - # reset the cluster config to be sure we can talk to all the hosts - try: - cluster.refresh() - except wekalib.exceptions.NameNotResolvable as exc: - log.critical(f"Names are not resolvable - are they in /etc/hosts or DNS? {exc}") - raise - except Exception as exc: - log.error(f"Cluster refresh failed on cluster '{cluster}' - check connectivity ({exc})") - raise - - # set up async api calling subsystem - self.asyncobj = Async(cluster, self.max_procs, self.max_threads_per_proc) - + def create_maps(self): + self.wekadata = dict() # get info from weka cluster - these are quick calls for stat, command in self.WEKAINFO.items(): try: - wekadata[stat] = cluster.call_api(command['method'], command['parms']) + self.wekadata[stat] = self.cluster.call_api(command['method'], command['parms']) self.api_stats['num_calls'] += 1 except Exception as exc: log.error(f"error getting {stat} from cluster {cluster}: {exc}") @@ -393,11 +365,11 @@ def gather(self): # populate maps try: - for node_id, node in wekadata["nodeList"].items(): + for node_id, node in self.wekadata["nodeList"].items(): node_host_map[node_id] = node["hostname"] node_role_map[node_id] = node["roles"] # node["node_id"] = node_id # this used to be inside here... - for host in wekadata["hostList"].values(): # node is a dict of node attribute + for host in self.wekadata["hostList"].values(): # node is a dict of node attribute if host['hostname'] not in host_role_map: # there may be MCB, so might be there already if host["mode"] == "backend": host_role_map[host["hostname"]] = "server" @@ -406,11 +378,51 @@ def gather(self): # update the maps so they can be used in the loki module self.map_registry.register('node-host', node_host_map) self.map_registry.register('node-role', node_role_map) - self.map_registry.register('node-role', host_role_map) + self.map_registry.register('host-role', host_role_map) except Exception as exc: log.error(f"error building maps: {exc}: Aborting data gather from cluster {str(cluster)}") raise + + # + # gather() gets fresh stats from the cluster as they update + # populates all datastructures with fresh data + # + # + def gather(self): + + cluster = self.cluster + self._reset_metrics() + + start_time = time.time() + log.info("gathering weka data from cluster {}".format(str(cluster))) + + # re-initialize self.wekadata so changes in the cluster don't leave behind strange things (hosts/nodes that no longer exist, etc) + #self.wekadata = {} + self.clusterdata[str(cluster)] = dict() + + # reset the cluster config to be sure we can talk to all the hosts + try: + cluster.refresh() + except wekalib.exceptions.NameNotResolvable as exc: + log.critical(f"Names are not resolvable - are they in /etc/hosts or DNS? {exc}") + raise + except Exception as exc: + log.error(f"Cluster refresh failed on cluster '{cluster}' - check connectivity ({exc})") + raise + + # set up async api calling subsystem + self.asyncobj = Async(cluster, self.max_procs, self.max_threads_per_proc) + + try: + self.create_maps() + except Exception as exc: + log.error(f"error creating maps: {exc}") + return + node_host_map = self.map_registry.lookup('node-host') + node_role_map = self.map_registry.lookup('node-role') + host_role_map = self.map_registry.lookup('host-role') + log.info(f"Cluster {cluster} Using {cluster.sizeof()} hosts") # be simplistic at first... let's just gather on a subset of nodes each query @@ -449,7 +461,7 @@ def gather(self): # up_list is a list of all the good hosts (ie: not down) up_list = list() backends_list = list() - for host in wekadata['hostList'].values(): + for host in self.wekadata['hostList'].values(): if host['status'] == 'UP' and host['state'] == 'ACTIVE' and host['hostname'] not in up_list: up_list.append(host['hostname']) if host['mode'] == 'backend': @@ -506,9 +518,9 @@ def gather(self): log.info("populating datastructures for cluster {}".format(str(cluster))) try: # determine Cloud Status - if wekadata["clusterinfo"]["cloud"]["healthy"]: + if self.wekadata["clusterinfo"]["cloud"]["healthy"]: cloudStatus = "Healthy" # must be enabled to be healthy - elif wekadata["clusterinfo"]["cloud"]["enabled"]: + elif self.wekadata["clusterinfo"]["cloud"]["enabled"]: cloudStatus = "Unhealthy" # enabled, but unhealthy else: cloudStatus = "Disabled" # disabled, healthy is meaningless @@ -523,25 +535,25 @@ def gather(self): log.debug(f"weka_info Gauge cluster={cluster.name}") try: # Weka status indicator - if (wekadata["clusterinfo"]["buckets"]["active"] == wekadata["clusterinfo"]["buckets"]["total"] and - wekadata["clusterinfo"]["drives"]["active"] == wekadata["clusterinfo"]["drives"]["total"] and - wekadata["clusterinfo"]["io_nodes"]["active"] == wekadata["clusterinfo"]["io_nodes"]["total"] and - wekadata["clusterinfo"]["hosts"]["backends"]["active"] == - wekadata["clusterinfo"]["hosts"]["backends"]["total"]): + if (self.wekadata["clusterinfo"]["buckets"]["active"] == self.wekadata["clusterinfo"]["buckets"]["total"] and + self.wekadata["clusterinfo"]["drives"]["active"] == self.wekadata["clusterinfo"]["drives"]["total"] and + self.wekadata["clusterinfo"]["io_nodes"]["active"] == self.wekadata["clusterinfo"]["io_nodes"]["total"] and + self.wekadata["clusterinfo"]["hosts"]["backends"]["active"] == + self.wekadata["clusterinfo"]["hosts"]["backends"]["total"]): WekaClusterStatus = "OK" else: WekaClusterStatus = "WARN" # Basic info - cluster.release = wekadata["clusterinfo"]["release"] # keep this up to date - wekacluster = {"cluster": str(cluster), "version": wekadata["clusterinfo"]["release"], - "cloud_status": cloudStatus, "license_status": wekadata["clusterinfo"]["licensing"]["mode"], - "io_status": wekadata["clusterinfo"]["io_status"], - "link_layer": wekadata["clusterinfo"]["net"]["link_layer"], "status": WekaClusterStatus} + cluster.release = self.wekadata["clusterinfo"]["release"] # keep this up to date + wekacluster = {"cluster": str(cluster), "version": self.wekadata["clusterinfo"]["release"], + "cloud_status": cloudStatus, "license_status": self.wekadata["clusterinfo"]["licensing"]["mode"], + "io_status": self.wekadata["clusterinfo"]["io_status"], + "link_layer": self.wekadata["clusterinfo"]["net"]["link_layer"], "status": WekaClusterStatus} metric_objs['wekainfo'].add_metric(labels=wekacluster.keys(), value=wekacluster) - # log.info( "cluster name: " + wekadata["clusterinfo"]["name"] ) + # log.info( "cluster name: " + self.wekadata["clusterinfo"]["name"] ) except Exception as exc: log.error("error cluster info - aborting populate of cluster {}".format(str(cluster))) raise @@ -550,8 +562,8 @@ def gather(self): try: # Uptime # not sure why, but sometimes this would fail... trim off the microseconds, because we really don't care - cluster_time = self._trim_time(wekadata["clusterinfo"]["time"]["cluster_time"]) - cluster_start_time = self._trim_time(wekadata["clusterinfo"]["io_status_changed_time"]) + cluster_time = self._trim_time(self.wekadata["clusterinfo"]["time"]["cluster_time"]) + cluster_start_time = self._trim_time(self.wekadata["clusterinfo"]["io_status_changed_time"]) now_obj = datetime.datetime.strptime(cluster_time, "%Y-%m-%dT%H:%M:%S") dt_obj = datetime.datetime.strptime(cluster_start_time, "%Y-%m-%dT%H:%M:%S") uptime = now_obj - dt_obj @@ -568,7 +580,7 @@ def gather(self): # e: weka_overview_activity_num_ops instead of weka_overview_activity_ops for name, parms in self.CLUSTERSTATS.items(): metric_objs["cluster_stat_" + name].add_metric([str(cluster)], - wekadata["clusterinfo"]["activity"][parms[2]]) + self.wekadata["clusterinfo"]["activity"][parms[2]]) except: # track = traceback.format_exc() @@ -577,37 +589,37 @@ def gather(self): log.debug(f"server overview cluster={cluster.name}") try: - metric_objs['weka_host_spares'].add_metric([str(cluster)], wekadata["clusterinfo"]["hot_spare"]) + metric_objs['weka_host_spares'].add_metric([str(cluster)], self.wekadata["clusterinfo"]["hot_spare"]) metric_objs['weka_host_spares_bytes'].add_metric([str(cluster)], - wekadata["clusterinfo"]["capacity"]["hot_spare_bytes"]) + self.wekadata["clusterinfo"]["capacity"]["hot_spare_bytes"]) metric_objs['weka_drive_storage_total_bytes'].add_metric([str(cluster)], - wekadata["clusterinfo"]["capacity"]["total_bytes"]) + self.wekadata["clusterinfo"]["capacity"]["total_bytes"]) metric_objs['weka_drive_storage_unprovisioned_bytes'].add_metric([str(cluster)], - wekadata["clusterinfo"]["capacity"][ + self.wekadata["clusterinfo"]["capacity"][ "unprovisioned_bytes"]) # this changed after WEKAPP-363037 - if 'servers' in wekadata["clusterinfo"]: + if 'servers' in self.wekadata["clusterinfo"]: stanza = "servers" else: stanza = "hosts" metric_objs['weka_num_servers_active'].add_metric([str(cluster)], - wekadata["clusterinfo"][stanza]["backends"]["active"]) + self.wekadata["clusterinfo"][stanza]["backends"]["active"]) metric_objs['weka_num_servers_total'].add_metric([str(cluster)], - wekadata["clusterinfo"][stanza]["backends"]["total"]) + self.wekadata["clusterinfo"][stanza]["backends"]["total"]) metric_objs['weka_num_clients_active'].add_metric([str(cluster)], - wekadata["clusterinfo"][stanza]["clients"]["active"]) + self.wekadata["clusterinfo"][stanza]["clients"]["active"]) metric_objs['weka_num_clients_total'].add_metric([str(cluster)], - wekadata["clusterinfo"][stanza]["clients"]["total"]) + self.wekadata["clusterinfo"][stanza]["clients"]["total"]) metric_objs['weka_num_drives_active'].add_metric([str(cluster)], - wekadata["clusterinfo"]["drives"]["active"]) - metric_objs['weka_num_drives_total'].add_metric([str(cluster)], wekadata["clusterinfo"]["drives"]["total"]) + self.wekadata["clusterinfo"]["drives"]["active"]) + metric_objs['weka_num_drives_total'].add_metric([str(cluster)], self.wekadata["clusterinfo"]["drives"]["total"]) except: log.error("error processing server overview for cluster {}".format(str(cluster))) log.debug(f"protection status cluster={cluster.name}") try: # protection status - rebuildStatus = wekadata["clusterinfo"]["rebuild"] + rebuildStatus = self.wekadata["clusterinfo"]["rebuild"] protectionStateList = rebuildStatus["protectionState"] numStates = len(protectionStateList) # 3 (0,1,2) for 2 parity), or 5 (0,1,2,3,4 for 4 parity) @@ -625,7 +637,7 @@ def gather(self): log.debug(f"filesystems cluster={cluster.name}") try: # Filesystem stats - for fs_id, fs in wekadata["fs_stat"].items(): + for fs_id, fs in self.wekadata["fs_stat"].items(): fs['total_percent_used'] = float(fs["used_total"]) / float(fs["available_total"]) * 100 fs['ssd_percent_used'] = float(fs["used_ssd"]) / float(fs["available_ssd"]) * 100 @@ -638,7 +650,7 @@ def gather(self): log.debug(f"alerts cluster={cluster.name}") try: - for alert in wekadata["alerts"]: + for alert in self.wekadata["alerts"]: if not alert["muted"]: log.debug(f"alert detected {alert['type']}") host_name = "None" @@ -662,7 +674,7 @@ def gather(self): log.error(f"error {exc} processing alerts for cluster {str(cluster)}") try: - for disk_id, drive in wekadata["driveList"].items(): + for disk_id, drive in self.wekadata["driveList"].items(): # removed drives can have null values - prom client code hates that! if drive['hostname'] is None or len(drive['hostname']) == 0: drive['hostname'] = "None" @@ -697,7 +709,7 @@ def gather(self): # 'reclaimable_high_thresh', 'reclaimable_low_thresh', # 'reclaimable_thresh', 'total_consumed_cap', 'used_cap']) try: - for fs in wekadata["obs_capacity"]: + for fs in self.wekadata["obs_capacity"]: metric_objs['fs_obs_total_consumed'].add_metric([str(cluster), fs['filesystem_name'], fs['fsId'], fs['obs_bucket_name']], fs['total_consumed_capacity']) metric_objs['fs_obs_cap_used'].add_metric([str(cluster), fs['filesystem_name'], fs['fsId'], @@ -816,7 +828,7 @@ def collect_logs(self, lokiserver): return try: - lokiserver.send_events(events, self.cluster) + lokiserver.send_events(events, self.cluster, self) except Exception as exc: log.critical(f"Error sending events: {exc} for cluster {self.cluster}") # log.critical(f"{traceback.format_exc()}") diff --git a/export.py b/export.py index 8692fe6..5caf3ab 100644 --- a/export.py +++ b/export.py @@ -171,6 +171,8 @@ def prom_client(config): while True: time.sleep(30) # sleep first, just in case we're started at the same time as Loki; give it time if lokiserver is not None: + if loki_only: + log.info(f"running in Loki-only mode") collector.collect_logs(lokiserver) diff --git a/lokilogs.py b/lokilogs.py index dd48ac6..a241a51 100644 --- a/lokilogs.py +++ b/lokilogs.py @@ -92,10 +92,19 @@ def loki_logevent(self, timestamp, event, **labels): # end loki_logevent # format the events and send them up to Loki - def send_events(self, event_dict, cluster): - + def send_events(self, event_dict, cluster, collector): + MINS = 60 num_successful = 0 + if self.registry.lookup('node-host') is None or self.registry.get_age('node-host') > 5 * MINS: + log.info(f"node-host map not populated... populating") + collector.create_maps() + log.info(f"node-host map populated.") + node_host_map = self.registry.lookup('node-host') + if node_host_map is None: + log.error(f"Unable to populate node-host map: {exc}") + + log.debug(f"node-host map age: {round(self.registry.get_age('node-host'),1)} seconds") if len(event_dict) == 0: log.debug("No events to send") diff --git a/maps.py b/maps.py index abbbe0c..5d2d26c 100644 --- a/maps.py +++ b/maps.py @@ -2,17 +2,42 @@ # maps - objects that map things (node ids to hostnames, etc) # from threading import Lock +import time +from logging import getLogger +log = getLogger(__name__) class MapRegistry(object): def __init__(self): self._lock = Lock() self.map_registry = dict() + self.timestamp = dict() def register(self, map_name, map_object): with self._lock: self.map_registry[map_name] = map_object + self.timestamp[map_name] = time.time() def lookup(self, map_name): with self._lock: - return self.map_registry[map_name] + try: + return self.map_registry[map_name] + except Exception as exc: + log.error(f"Exception looking up {map_name} map: {exc}") + return None + + def get_timestamp(self, map_name): + with self._lock: + try: + return self.timestamp[map_name] + except Exception as exc: + log.error(f"Exception looking up timestamp for {map_name} map: {exc}") + return None + + def get_age(self, map_name): + with self._lock: + try: + return time.time() - self.timestamp[map_name] + except Exception as exc: + log.error(f"Exception looking up age for {map_name} map: {exc}") + return None \ No newline at end of file From 647c7a7295515c9434e34e67b41b16046d01bace Mon Sep 17 00:00:00 2001 From: Vince Fleming Date: Thu, 8 Aug 2024 15:39:28 -0400 Subject: [PATCH 3/8] changes for loki-only mode --- export.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/export.yml b/export.yml index 373341d..fb9c69e 100644 --- a/export.yml +++ b/export.yml @@ -25,12 +25,12 @@ cluster: verify_cert: False # default cert cannot be verified mgmt_port: 14000 - # auth_token_file can be an absolute path, relative path, or filename. + # auth_token_file must be a filename. # If just a filename it will be searched for in ".", "~/.weka", and "./.weka" # This file can be generated with the 'weka user login' command and copied to where we are running - # hosts is a list of hostnames or ip addresses. Minimum of 1 requred. You do not need to list all hosts in the cluster + # hosts is a list of hostnames or ip addresses. Minimum of 1 required. You do not need to list all hosts in the cluster -# This file comes pre-set to pupulate the Grafana Panels that we've provided +# This file comes pre-set to populate the Grafana Panels that we've provided # # File format: # From 27efc54fd371319788dc15403eefac4aee183d4e Mon Sep 17 00:00:00 2001 From: Vince Fleming Date: Fri, 9 Aug 2024 10:48:08 -0400 Subject: [PATCH 4/8] reworked event handling --- collector.py | 4 +- lokilogs.py => events.py | 179 ++++++++++++++----------------------- export.py | 186 ++++++++++++++++++++++++--------------- export.yml | 4 +- 4 files changed, 186 insertions(+), 187 deletions(-) rename lokilogs.py => events.py (61%) diff --git a/collector.py b/collector.py index 3628bb7..eb08cfe 100644 --- a/collector.py +++ b/collector.py @@ -810,7 +810,7 @@ def get_stat(raw_stats_data): # ------------- end of gather() ------------- - def collect_logs(self, lokiserver): + def collect_logs(self, event_processor): with self._access_lock: # make sure we don't conflict with a metrics collection log.info(f"getting events for cluster {self.cluster}") try: @@ -828,7 +828,7 @@ def collect_logs(self, lokiserver): return try: - lokiserver.send_events(events, self.cluster, self) + event_processor.send_events(events, self.cluster, self) except Exception as exc: log.critical(f"Error sending events: {exc} for cluster {self.cluster}") # log.critical(f"{traceback.format_exc()}") diff --git a/lokilogs.py b/events.py similarity index 61% rename from lokilogs.py rename to events.py index a241a51..3c2ae39 100644 --- a/lokilogs.py +++ b/events.py @@ -8,28 +8,32 @@ # example of usage grafana/loki api when you need push any log/message from your python scipt import argparse -import datetime import json -# import syslog import time import socket -import sys from logging import getLogger, INFO -import dateutil -import dateutil.parser import requests import urllib3 from wekalib.wekatime import lokitime_to_wekatime, wekatime_to_datetime, lokitime_to_datetime, datetime_to_lokitime, datetime_to_wekatime log = getLogger(__name__) +syslog = getLogger("event_syslog") -class LokiServer(object): - def __init__(self, lokihost, lokiport, map_registry): + +class WekaEventProcessor(object): + def __init__(self, map_registry): + self.loki = False + self.host = "" + self.port = 0 + self.registry = map_registry + + + def configure_loki(self, lokihost, lokiport): + self.loki = True self.host = lokihost self.port = lokiport - self.registry = map_registry # save some trouble, and make sure names are resolvable try: socket.gethostbyname(lokihost) @@ -42,52 +46,54 @@ def __init__(self, lokihost, lokiport, map_registry): # push msg log into grafana-loki def loki_logevent(self, timestamp, event, **labels): - url = 'http://' + self.host + ':' + str(self.port) + '/loki/api/v1/push' # set the URL - - # set the headers - headers = { - 'Content-type': 'application/json' - } - - log.debug(f"{labels}") - # set the payload - payload = { - 'streams': [ - { - 'stream': labels["labels"], - 'values': [ - [timestamp, event] - ] - } - ] - } - - # encode payload to a string - payload_str = json.dumps(payload) - # log.debug( json.dumps(payload, indent=4, sort_keys=True) ) - - # this is where we actually send it - try: - answer = requests.post(url, data=payload_str, headers=headers) - except requests.exceptions.ConnectionError as exc: - log.critical(f"Unable to send Events to Loki: unable to establish connection: FATAL") - raise + if self.loki: + url = 'http://' + self.host + ':' + str(self.port) + '/loki/api/v1/push' # set the URL - except Exception as exc: - log.critical(f"Unable to send Events to Loki") - raise + # set the headers + headers = { + 'Content-type': 'application/json' + } - log.debug(f"status code: {answer.status_code}") - # check the return code - if answer.status_code == 400: - # I've only seen code 400 for duplicate entries; but I could be wrong. ;) - log.error(f"Error posting event; possible duplicate entry: {answer.text}") - return False - elif answer.status_code != 204: # 204 is ok - log.error("loki_logevent(): bad http status code: " + str(answer.status_code) + " " + answer.text) - return False - - return True + log.debug(f"{labels}") + # set the payload + payload = { + 'streams': [ + { + 'stream': labels["labels"], + 'values': [ + [timestamp, event] + ] + } + ] + } + + # encode payload to a string + payload_str = json.dumps(payload) + # log.debug( json.dumps(payload, indent=4, sort_keys=True) ) + + # this is where we actually send it + try: + answer = requests.post(url, data=payload_str, headers=headers) + except requests.exceptions.ConnectionError as exc: + log.critical(f"Unable to send Events to Loki: unable to establish connection: FATAL") + raise + except Exception as exc: + log.critical(f"Unable to send Events to Loki: {exc}") + raise + + log.debug(f"status code: {answer.status_code}") + # check the return code + if answer.status_code == 400: + # I've only seen code 400 for duplicate entries; but I could be wrong. ;) + log.error(f"Error posting event; possible duplicate entry: {answer.text}") + return False + elif answer.status_code != 204: # 204 is ok + log.error("loki_logevent(): bad http status code: " + str(answer.status_code) + " " + answer.text) + return False + + return True + else: + return False # ? not sure what to return yet # end loki_logevent @@ -151,72 +157,21 @@ def send_events(self, event_dict, cluster, collector): description = f"cluster:{cluster.name} :{orig_sev}: {event['type']}: {event['description']}" log.debug(f"sending event: timestamp={timestamp}, labels={labels}, desc={description}") - log.log(INFO, f"WekaEvent: {description}") # send to syslog - - try: - if self.loki_logevent(timestamp, description, labels=labels): - # only update time if upload successful, so we don't drop events (they should retry upload next time) - cluster.last_event_timestamp = event['timestamp'] - num_successful += 1 - except: - continue # if it has an exception, abort + syslog.info(f"WekaEvent: {description}") # send to syslog, if configured + if self.loki: + try: + if self.loki_logevent(timestamp, description, labels=labels): + cluster.last_event_timestamp = event['timestamp'] + num_successful += 1 + except: + # error messages are already logged in loki_logevent + continue # just move on... log.info(f"Total events={len(event_dict)}; successfully sent {num_successful} events") if num_successful != 0: cluster.last_event_timestamp = cluster.last_get_events_time - # end send_events - - -# Not used anymore... but might come in handy -# get the time of the last event that Loki has for this cluster so we know where we left off -def last_lokievent_time(lokihost, port, cluster): - http_pool = urllib3.PoolManager() - log.debug("getting last event from Loki") - # url = 'http://' + lokihost + ':' + str(port) + '/loki/api/v1/query' # set the URL - url = 'http://' + lokihost + ':' + str(port) + '/loki/api/v1/query_range' # set the URL - - # set the headers - headers = { - 'Content-type': 'application/json' - } - - clusternamequery = "{cluster=\"" + f"{cluster.name}" + "\"}" - fields = { - # 'direction': "BACKWARDS", - 'query': clusternamequery - } - try: - latest = http_pool.request('GET', url, fields=fields) - except Exception as exc: - log.debug(f"{exc} caught") - return "0" - - if latest.status != 200: - return "0" - - log.debug(f"{latest.status} {latest.data}") - latest_data = json.loads(latest.data) - - newest = 0 - # log.debug(f"latest_data={json.dumps(latest_data, indent=4)}") - results = latest_data["data"]["result"] - for result in results: - values = result["values"] - for value in values: - if int(value[0]) > newest: - newest = int(value[0]) - log.debug(f"timeval={lokitime_to_wekatime(value[0])}") - - first_result = str(newest) - - log.debug(f"first_result={first_result}, {lokitime_to_wekatime(first_result)}") - - return first_result - - # end last_lokievent_time - if __name__ == '__main__': diff --git a/export.py b/export.py index 5caf3ab..bbdfcd7 100644 --- a/export.py +++ b/export.py @@ -20,16 +20,16 @@ import prometheus_client +from events import WekaEventProcessor # local imports #from maps import Map, MapRegistry from maps import MapRegistry import wekalib.signals as signals from collector import WekaCollector -from lokilogs import LokiServer from wekalib.wekacluster import WekaCluster import wekalib.exceptions -VERSION = "1.7.4" +VERSION = "1.7.5" #VERSION = "experimental" @@ -76,39 +76,59 @@ def prom_client(config): log.error(f"'exporter:' stanza missing from .yml file - version mismatch between .yml and exporter version?") sys.exit(1) - if 'force_https' not in config['cluster']: # allow defaults for these - config['cluster']['force_https'] = False + # Parse config file, set defaults for missing values + cluster = config['cluster'] + exporter = config['exporter'] - if 'verify_cert' not in config['cluster']: - config['cluster']['verify_cert'] = True + # cluster options + cluster['force_https'] = cluster.get('force_https', False) + cluster['verify_cert'] = cluster.get('verify_cert', True) + cluster['mgmt_port'] = cluster.get('mgmt_port', 14000) - if 'mgmt_port' not in config['cluster']: - config['cluster']['mgmt_port'] = 14000 + # exporter options + exporter['timeout'] = exporter.get('timeout', 10) + exporter['backends_only'] = exporter.get('backends_only', False) + exporter['datapoints_per_collect'] = exporter.get('datapoints_per_collect', 1) + exporter['certfile'] = exporter.get('certfile', None) + exporter['keyfile'] = exporter.get('keyfile', None) - if 'timeout' not in config['exporter']: - config['exporter']['timeout'] = 10 + # logging options + events_to_loki = exporter.get('events_to_loki', True) + events_to_syslog = exporter.get('events_to_syslog', True) - if 'backends_only' not in config['exporter']: - config['exporter']['backends_only'] = False - - if 'datapoints_per_collect' not in config['exporter']: - config['exporter']['datapoints_per_collect'] = 1 - - if 'certfile' not in config['exporter']: - config['exporter']['certfile'] = None - - if 'keyfile' not in config['exporter']: - config['exporter']['keyfile'] = None - - log.info(f"Timeout set to {config['exporter']['timeout']} secs") + # is there a loki server set? + loki_host = exporter.get('loki_host', None) + loki_port = exporter.get('loki_port', 3100) + events_only = exporter.get('events_only', False) + + # log the timeout + log.info(f"Prometheus Exporter for Weka version {VERSION}") + log.info("Configuration:") + #log.info(f"config file: {config['configfile']}") + log.info(f"cluster hosts: {cluster['hosts']}") + if len(cluster['hosts']) <= 1: + log.warning(f"Only one host defined - consider adding more for HA") + log.info(f"force_https: {cluster['force_https']}") + log.info(f"verify_cert: {cluster['verify_cert']}") + log.info(f"mgmt_port: {cluster['mgmt_port']}") + log.info(f"timeout: {exporter['timeout']} secs") + log.info(f"backends_only: {exporter['backends_only']}") + log.info(f"datapoints_per_collect: {exporter['datapoints_per_collect']}") + log.info(f"certfile: {exporter['certfile']}") + log.info(f"keyfile: {exporter['keyfile']}") + log.info(f"loki_host: {loki_host}") + log.info(f"loki_port: {loki_port}") + log.info(f"events_only: {events_only}") + log.info(f"events_to_loki: {events_to_loki}") + log.info(f"events_to_syslog: {events_to_syslog}") try: - cluster_obj = WekaCluster(config['cluster']['hosts'], config['cluster']['auth_token_file'], - force_https=config['cluster']['force_https'], - verify_cert=config['cluster']['verify_cert'], - backends_only=config['exporter']['backends_only'], - timeout=config['exporter']['timeout'], - mgmt_port=config['cluster']['mgmt_port']) + cluster_obj = WekaCluster(cluster['hosts'], cluster['auth_token_file'], + force_https=cluster['force_https'], + verify_cert=cluster['verify_cert'], + backends_only=exporter['backends_only'], + timeout=exporter['timeout'], + mgmt_port=cluster['mgmt_port']) except wekalib.exceptions.HTTPError as exc: if exc.code == 403: log.critical(f"Cluster returned permission error - is the userid level ReadOnly or above?") @@ -131,28 +151,24 @@ def prom_client(config): # create the WekaCollector object collector = WekaCollector(config, cluster_obj) - # is there a loki server set? - loki_host = config['exporter'].get('loki_host', None) - loki_port = config['exporter'].get('loki_port', 3100) - loki_only = config['exporter'].get('loki_only', False) + # create the event processor + event_processor = WekaEventProcessor(maps) if events_to_loki or events_to_syslog else None - if loki_host is not None and len(loki_host) != 0: - log.info(f"loki_host set to {loki_host}") - try: - lokiserver = LokiServer(loki_host, loki_port, maps) - except: - sys.exit(1) - else: - lokiserver = None + if event_processor is not None: + if events_to_loki: + log.info("Events to Loki enabled") + event_processor.configure_loki(loki_host, loki_port) - if loki_only and lokiserver is None: - log.critical("loki_only set, but no Loki server defined in config file") - sys.exit(1) + configure_event_syslog(events_to_syslog) + else: + if events_only: + log.critical("events_only set, but not configured to send them anywhere") + sys.exit(1) # # Start up the server to expose the metrics. # - if not loki_only: + if not events_only: log.info(f"starting http server on port {config['exporter']['listen_port']}") try: if config['exporter']['certfile'] is not None and config['exporter']['keyfile'] is not None: @@ -170,35 +186,27 @@ def prom_client(config): while True: time.sleep(30) # sleep first, just in case we're started at the same time as Loki; give it time - if lokiserver is not None: - if loki_only: - log.info(f"running in Loki-only mode") - collector.collect_logs(lokiserver) - + if event_processor is not None: + collector.collect_logs(event_processor) -def configure_logging(logger, verbosity, disable_syslog=False): +def configure_logging(logger, verbosity): loglevel = logging.INFO # default logging level libloglevel = logging.ERROR # default message formats console_format = "%(message)s" - syslog_format = "%(levelname)s:%(message)s" - - syslog_format = "%(process)s:%(filename)s:%(lineno)s:%(funcName)s():%(levelname)s:%(message)s" + #syslog_format = "%(levelname)s:%(message)s" if verbosity == 1: loglevel = logging.INFO console_format = "%(levelname)s:%(message)s" - syslog_format = "%(process)s:%(filename)s:%(lineno)s:%(funcName)s():%(levelname)s:%(message)s" libloglevel = logging.INFO elif verbosity == 2: loglevel = logging.DEBUG console_format = "%(filename)s:%(lineno)s:%(funcName)s():%(levelname)s:%(message)s" - syslog_format = "%(process)s:%(filename)s:%(lineno)s:%(funcName)s():%(levelname)s:%(message)s" elif verbosity > 2: loglevel = logging.DEBUG console_format = "%(filename)s:%(lineno)s:%(funcName)s():%(levelname)s:%(message)s" - syslog_format = "%(process)s:%(filename)s:%(lineno)s:%(funcName)s():%(levelname)s:%(message)s" libloglevel = logging.DEBUG @@ -207,20 +215,6 @@ def configure_logging(logger, verbosity, disable_syslog=False): console_handler.setFormatter(logging.Formatter(console_format)) logger.addHandler(console_handler) - if not disable_syslog: - # create handler to log to syslog - logger.info(f"setting syslog on {platform.platform()}") - if platform.platform()[:5] == "macOS": - syslogaddr = "/var/run/syslog" - else: - syslogaddr = "/dev/log" - syslog_handler = logging.handlers.SysLogHandler(syslogaddr) - syslog_handler.setFormatter(logging.Formatter(syslog_format)) - - # add syslog handler to root logger - if syslog_handler is not None: - logger.addHandler(syslog_handler) - # set default loglevel logger.setLevel(loglevel) @@ -236,6 +230,52 @@ def configure_logging(logger, verbosity, disable_syslog=False): logging.getLogger("async_api").setLevel(loglevel) +def configure_syslog(logger, enabled=False): + # see if there is a handler already + syslog_handler = get_handler(logger, logging.handlers.SysLogHandler) + + if enabled: + if syslog_handler is None: + syslogaddr = "/var/run/syslog" if platform.platform()[:5] == "macOS" else "/dev/log" + log.info(f"enabling syslog program logging to {syslogaddr}") + syslog_handler = logging.handlers.SysLogHandler(syslogaddr) + logger.addHandler(syslog_handler) + + syslog_handler.setFormatter( + logging.Formatter("%(process)s:%(filename)s:%(lineno)s:%(funcName)s():%(levelname)s:%(message)s")) + log.info("syslog enabled") + else: + if syslog_handler is not None: + logger.removeHandler(syslog_handler) + log.info("syslog program logging disabled") + + +def configure_event_syslog(enabled=False): + syslog = logging.getLogger("event_syslog") + if enabled: + syslog.setLevel(logging.INFO) + # see if there is a handler already + syslog_handler = get_handler(syslog, logging.handlers.SysLogHandler) + if syslog_handler is None: + syslogaddr = "/var/run/syslog" if platform.platform()[:5] == "macOS" else "/dev/log" + syslog_handler = logging.handlers.SysLogHandler(syslogaddr) + syslog.addHandler(syslog_handler) + syslog_handler.setFormatter(logging.Formatter("%(message)s")) + log.info("event syslog enabled") + else: + handler = get_handler(syslog, logging.handlers.SysLogHandler) + if handler is not None: + syslog.removeHandler(handler) + log.info("event syslog disabled") + + +def get_handler(logger, handler_type): + for handler in logger.handlers: + if isinstance(handler, handler_type): + return handler + return None + + def main(): # handle signals (ie: ^C and such) signals.signal_handling() @@ -252,7 +292,8 @@ def main(): print(f"{sys.argv[0]} version {VERSION}") sys.exit(0) - configure_logging(log, args.verbosity, disable_syslog=args.no_syslog) + configure_logging(log, args.verbosity) + configure_syslog(log, not args.no_syslog) # program syslog logging if not os.path.exists(args.configfile): log.critical(f"Required configfile '{args.configfile}' does not exist") @@ -266,6 +307,7 @@ def main(): return log.debug("config file loaded") + # main loop prom_client(config) diff --git a/export.yml b/export.yml index fb9c69e..6fcc27e 100644 --- a/export.yml +++ b/export.yml @@ -7,7 +7,9 @@ exporter: listen_port: 8150 loki_host: wms loki_port: 3100 - loki_only: True + events_only: True + events_to_loki: True + events_to_syslog: True timeout: 10.0 max_procs: 8 max_threads_per_proc: 100 From c7bd7cf57e0194bcfb32f36e9d0fbc32b53ed23c Mon Sep 17 00:00:00 2001 From: Vince Fleming Date: Fri, 9 Aug 2024 11:40:35 -0400 Subject: [PATCH 5/8] reworked event handling --- events.py | 114 ++++++++++++++++++++++++++---------------------------- 1 file changed, 55 insertions(+), 59 deletions(-) diff --git a/events.py b/events.py index 3c2ae39..c95d49a 100644 --- a/events.py +++ b/events.py @@ -46,61 +46,57 @@ def configure_loki(self, lokihost, lokiport): # push msg log into grafana-loki def loki_logevent(self, timestamp, event, **labels): - if self.loki: - url = 'http://' + self.host + ':' + str(self.port) + '/loki/api/v1/push' # set the URL - - # set the headers - headers = { - 'Content-type': 'application/json' - } + url = 'http://' + self.host + ':' + str(self.port) + '/loki/api/v1/push' # set the URL + + # set the headers + headers = { + 'Content-type': 'application/json' + } + + log.debug(f"{labels}") + # set the payload + payload = { + 'streams': [ + { + 'stream': labels["labels"], + 'values': [ + [timestamp, event] + ] + } + ] + } + + # encode payload to a string + payload_str = json.dumps(payload) + # log.debug( json.dumps(payload, indent=4, sort_keys=True) ) + + # this is where we actually send it + try: + answer = requests.post(url, data=payload_str, headers=headers) + except requests.exceptions.ConnectionError as exc: + log.critical(f"Unable to send Events to Loki: unable to establish connection: FATAL") + return False + except Exception as exc: + log.critical(f"Unable to send Events to Loki: {exc}") + return False - log.debug(f"{labels}") - # set the payload - payload = { - 'streams': [ - { - 'stream': labels["labels"], - 'values': [ - [timestamp, event] - ] - } - ] - } + log.debug(f"status code: {answer.status_code}") + # check the return code + if answer.status_code == 400: + # I've only seen code 400 for duplicate entries; but I could be wrong. ;) + log.error(f"Error posting event; possible duplicate entry: {answer.text}") + return False + elif answer.status_code != 204: # 204 is ok + log.error("loki_logevent(): bad http status code: " + str(answer.status_code) + " " + answer.text) + return False - # encode payload to a string - payload_str = json.dumps(payload) - # log.debug( json.dumps(payload, indent=4, sort_keys=True) ) - - # this is where we actually send it - try: - answer = requests.post(url, data=payload_str, headers=headers) - except requests.exceptions.ConnectionError as exc: - log.critical(f"Unable to send Events to Loki: unable to establish connection: FATAL") - raise - except Exception as exc: - log.critical(f"Unable to send Events to Loki: {exc}") - raise - - log.debug(f"status code: {answer.status_code}") - # check the return code - if answer.status_code == 400: - # I've only seen code 400 for duplicate entries; but I could be wrong. ;) - log.error(f"Error posting event; possible duplicate entry: {answer.text}") - return False - elif answer.status_code != 204: # 204 is ok - log.error("loki_logevent(): bad http status code: " + str(answer.status_code) + " " + answer.text) - return False - - return True - else: - return False # ? not sure what to return yet + return True # end loki_logevent # format the events and send them up to Loki def send_events(self, event_dict, cluster, collector): MINS = 60 - num_successful = 0 if self.registry.lookup('node-host') is None or self.registry.get_age('node-host') > 5 * MINS: log.info(f"node-host map not populated... populating") collector.create_maps() @@ -117,7 +113,7 @@ def send_events(self, event_dict, cluster, collector): return # must be sorted by timestamp or Loki will reject them - #last_eventtime = "0" + num_successful = 0 for timestamp, event in sorted(event_dict.items()): # oldest first labels = { "source": "weka", @@ -159,18 +155,18 @@ def send_events(self, event_dict, cluster, collector): syslog.info(f"WekaEvent: {description}") # send to syslog, if configured + success = False if self.loki: - try: - if self.loki_logevent(timestamp, description, labels=labels): - cluster.last_event_timestamp = event['timestamp'] - num_successful += 1 - except: - # error messages are already logged in loki_logevent - continue # just move on... - - log.info(f"Total events={len(event_dict)}; successfully sent {num_successful} events") - if num_successful != 0: - cluster.last_event_timestamp = cluster.last_get_events_time + success = self.loki_logevent(timestamp, description, labels=labels) + + if success: + num_successful += 1 + + cluster.last_event_timestamp = event['timestamp'] + + log.info(f"Total events={len(event_dict)}; successfully sent {num_successful} events to Loki") + #if self.loki and num_successful != 0: + # cluster.last_event_timestamp = cluster.last_get_events_time if __name__ == '__main__': From d7f9afa10585dad8f59df54095b76d7838dfb225 Mon Sep 17 00:00:00 2001 From: Vince Fleming Date: Fri, 9 Aug 2024 14:07:39 -0400 Subject: [PATCH 6/8] reworked event handling --- collector.py | 74 ++++++++++++++++++++++-------------------- events.py | 6 ++-- export.py | 7 ++-- export.yml | 2 +- maps.py => register.py | 7 ++-- 5 files changed, 51 insertions(+), 45 deletions(-) rename maps.py => register.py (80%) diff --git a/collector.py b/collector.py index eb08cfe..d8218ab 100644 --- a/collector.py +++ b/collector.py @@ -115,9 +115,10 @@ def __init__(self, config, cluster_obj): # wekaCollector self.datapoints_per_collect = exporter['datapoints_per_collect'] else: self.datapoints_per_collect = 1 - self.map_registry = config["map_registry"] + self.registry = config["registry"] self.cluster = cluster_obj + self.wekadata = dict() global weka_stat_list @@ -348,40 +349,42 @@ def store_results(self, cluster, results): self.clusterdata[str(cluster)][stat] += result.result def create_maps(self): - self.wekadata = dict() # get info from weka cluster - these are quick calls - for stat, command in self.WEKAINFO.items(): - try: - self.wekadata[stat] = self.cluster.call_api(command['method'], command['parms']) - self.api_stats['num_calls'] += 1 - except Exception as exc: - log.error(f"error getting {stat} from cluster {cluster}: {exc}") - # decision - if we can't get the basic info, we can't get anything else, so abort? + with self._access_lock: # make it re-entrant + if self.registry.lookup('node-host') is not None and self.registry.get_age('node-host') < 50: # 50 seconds + return # already populated by another thread recently + for stat, command in self.WEKAINFO.items(): + try: + self.wekadata[stat] = self.cluster.call_api(command['method'], command['parms']) + self.api_stats['num_calls'] += 1 + except Exception as exc: + log.error(f"error getting {stat} from cluster {self.cluster}: {exc}") + # decision - if we can't get the basic info, we can't get anything else, so abort? - # clear old maps, if any - if nodes come/go this can get funky with old data, so re-create it every time - node_host_map = dict() - node_role_map = dict() - host_role_map = dict() + # clear old maps, if any - if nodes come/go this can get funky with old data, so re-create it every time + node_host_map = dict() + node_role_map = dict() + host_role_map = dict() - # populate maps - try: - for node_id, node in self.wekadata["nodeList"].items(): - node_host_map[node_id] = node["hostname"] - node_role_map[node_id] = node["roles"] - # node["node_id"] = node_id # this used to be inside here... - for host in self.wekadata["hostList"].values(): # node is a dict of node attribute - if host['hostname'] not in host_role_map: # there may be MCB, so might be there already - if host["mode"] == "backend": - host_role_map[host["hostname"]] = "server" - else: - host_role_map[host["hostname"]] = "client" - # update the maps so they can be used in the loki module - self.map_registry.register('node-host', node_host_map) - self.map_registry.register('node-role', node_role_map) - self.map_registry.register('host-role', host_role_map) - except Exception as exc: - log.error(f"error building maps: {exc}: Aborting data gather from cluster {str(cluster)}") - raise + # populate maps + try: + for node_id, node in self.wekadata["nodeList"].items(): + node_host_map[node_id] = node["hostname"] + node_role_map[node_id] = node["roles"] + # node["node_id"] = node_id # this used to be inside here... + for host in self.wekadata["hostList"].values(): # node is a dict of node attribute + if host['hostname'] not in host_role_map: # there may be MCB, so might be there already + if host["mode"] == "backend": + host_role_map[host["hostname"]] = "server" + else: + host_role_map[host["hostname"]] = "client" + # update the maps so they can be used in the loki module + self.registry.register('node-host', node_host_map) + self.registry.register('node-role', node_role_map) + self.registry.register('host-role', host_role_map) + except Exception as exc: + log.error(f"error building maps: {exc}: Aborting data gather from cluster {str(cluster)}") + raise # @@ -414,14 +417,15 @@ def gather(self): # set up async api calling subsystem self.asyncobj = Async(cluster, self.max_procs, self.max_threads_per_proc) + self.wekadata = dict() try: self.create_maps() except Exception as exc: log.error(f"error creating maps: {exc}") return - node_host_map = self.map_registry.lookup('node-host') - node_role_map = self.map_registry.lookup('node-role') - host_role_map = self.map_registry.lookup('host-role') + node_host_map = self.registry.lookup('node-host') + node_role_map = self.registry.lookup('node-role') + host_role_map = self.registry.lookup('host-role') log.info(f"Cluster {cluster} Using {cluster.sizeof()} hosts") diff --git a/events.py b/events.py index c95d49a..77348ca 100644 --- a/events.py +++ b/events.py @@ -23,11 +23,11 @@ class WekaEventProcessor(object): - def __init__(self, map_registry): + def __init__(self, registry): self.loki = False self.host = "" self.port = 0 - self.registry = map_registry + self.registry = registry def configure_loki(self, lokihost, lokiport): @@ -104,7 +104,7 @@ def send_events(self, event_dict, cluster, collector): node_host_map = self.registry.lookup('node-host') if node_host_map is None: - log.error(f"Unable to populate node-host map: {exc}") + log.error(f"Unable to populate node-host map: map creation failed") log.debug(f"node-host map age: {round(self.registry.get_age('node-host'),1)} seconds") diff --git a/export.py b/export.py index bbdfcd7..24a9616 100644 --- a/export.py +++ b/export.py @@ -23,7 +23,7 @@ from events import WekaEventProcessor # local imports #from maps import Map, MapRegistry -from maps import MapRegistry +from register import Registry import wekalib.signals as signals from collector import WekaCollector from wekalib.wekacluster import WekaCluster @@ -145,14 +145,13 @@ def prom_client(config): #log.critical(traceback.format_exc()) return - maps = MapRegistry() - config["map_registry"] = maps + config["registry"] = Registry() # create the WekaCollector object collector = WekaCollector(config, cluster_obj) # create the event processor - event_processor = WekaEventProcessor(maps) if events_to_loki or events_to_syslog else None + event_processor = WekaEventProcessor(config["registry"]) if events_to_loki or events_to_syslog else None if event_processor is not None: if events_to_loki: diff --git a/export.yml b/export.yml index 6fcc27e..29561a4 100644 --- a/export.yml +++ b/export.yml @@ -7,7 +7,7 @@ exporter: listen_port: 8150 loki_host: wms loki_port: 3100 - events_only: True + events_only: False events_to_loki: True events_to_syslog: True timeout: 10.0 diff --git a/maps.py b/register.py similarity index 80% rename from maps.py rename to register.py index 5d2d26c..06ab987 100644 --- a/maps.py +++ b/register.py @@ -1,5 +1,6 @@ # -# maps - objects that map things (node ids to hostnames, etc) +# register.py - This module provides a class to register and lookup objects by name. +# It also keeps track of the time each object was registered. # from threading import Lock import time @@ -7,7 +8,7 @@ from logging import getLogger log = getLogger(__name__) -class MapRegistry(object): +class Registry(object): def __init__(self): self._lock = Lock() self.map_registry = dict() @@ -36,6 +37,8 @@ def get_timestamp(self, map_name): def get_age(self, map_name): with self._lock: + if map_name not in self.timestamp: + return -1 # no age, as it was never registered try: return time.time() - self.timestamp[map_name] except Exception as exc: From 72d416ae0d5ea08166a146a6bcd528918f76024c Mon Sep 17 00:00:00 2001 From: Vince Fleming Date: Fri, 9 Aug 2024 15:19:52 -0400 Subject: [PATCH 7/8] reworked event handling --- collector.py | 101 ++++++++++++++++++++++++++++----------------------- events.py | 9 +++-- export.py | 3 ++ 3 files changed, 63 insertions(+), 50 deletions(-) diff --git a/collector.py b/collector.py index d8218ab..60fb15b 100644 --- a/collector.py +++ b/collector.py @@ -348,43 +348,54 @@ def store_results(self, cluster, results): else: self.clusterdata[str(cluster)][stat] += result.result - def create_maps(self): - # get info from weka cluster - these are quick calls - with self._access_lock: # make it re-entrant - if self.registry.lookup('node-host') is not None and self.registry.get_age('node-host') < 50: # 50 seconds - return # already populated by another thread recently - for stat, command in self.WEKAINFO.items(): - try: - self.wekadata[stat] = self.cluster.call_api(command['method'], command['parms']) - self.api_stats['num_calls'] += 1 - except Exception as exc: - log.error(f"error getting {stat} from cluster {self.cluster}: {exc}") - # decision - if we can't get the basic info, we can't get anything else, so abort? - # clear old maps, if any - if nodes come/go this can get funky with old data, so re-create it every time - node_host_map = dict() - node_role_map = dict() - host_role_map = dict() + # refresh_maps() is called by the events.py routine to ensure the node-host map is up to date + def refresh_maps(self): + with self._access_lock: + if self.registry.lookup('node-host') is None or self.registry.get_age('node-host') > 5 * 60: + log.info(f"node-host map not populated... populating") + self.create_maps() + log.info(f"node-host map populated.") - # populate maps + def create_maps(self): + # get info from weka cluster - these are quick calls + self.cluster.refresh() + if self.registry.lookup('node-host') is not None and self.registry.get_age('node-host') < 50: # 50 seconds + return # already populated by another thread recently + # re-initialize self.wekadata so changes in the cluster don't leave behind strange things (hosts/nodes that no longer exist, etc) + self.wekadata = dict() + for stat, command in self.WEKAINFO.items(): try: - for node_id, node in self.wekadata["nodeList"].items(): - node_host_map[node_id] = node["hostname"] - node_role_map[node_id] = node["roles"] - # node["node_id"] = node_id # this used to be inside here... - for host in self.wekadata["hostList"].values(): # node is a dict of node attribute - if host['hostname'] not in host_role_map: # there may be MCB, so might be there already - if host["mode"] == "backend": - host_role_map[host["hostname"]] = "server" - else: - host_role_map[host["hostname"]] = "client" - # update the maps so they can be used in the loki module - self.registry.register('node-host', node_host_map) - self.registry.register('node-role', node_role_map) - self.registry.register('host-role', host_role_map) + self.wekadata[stat] = self.cluster.call_api(command['method'], command['parms']) + self.api_stats['num_calls'] += 1 except Exception as exc: - log.error(f"error building maps: {exc}: Aborting data gather from cluster {str(cluster)}") - raise + log.error(f"error getting {stat} from cluster {self.cluster}: {exc}") + # decision - if we can't get the basic info, we can't get anything else, so abort? + + # clear old maps, if any - if nodes come/go this can get funky with old data, so re-create it every time + node_host_map = dict() + node_role_map = dict() + host_role_map = dict() + + # populate maps + try: + for node_id, node in self.wekadata["nodeList"].items(): + node_host_map[node_id] = node["hostname"] + node_role_map[node_id] = node["roles"] + # node["node_id"] = node_id # this used to be inside here... + for host in self.wekadata["hostList"].values(): # node is a dict of node attribute + if host['hostname'] not in host_role_map: # there may be MCB, so might be there already + if host["mode"] == "backend": + host_role_map[host["hostname"]] = "server" + else: + host_role_map[host["hostname"]] = "client" + # update the maps so they can be used in the loki module + self.registry.register('node-host', node_host_map) + self.registry.register('node-role', node_role_map) + self.registry.register('host-role', host_role_map) + except Exception as exc: + log.error(f"error building maps: {exc}: Aborting data gather from cluster {str(cluster)}") + raise # @@ -400,24 +411,22 @@ def gather(self): start_time = time.time() log.info("gathering weka data from cluster {}".format(str(cluster))) - # re-initialize self.wekadata so changes in the cluster don't leave behind strange things (hosts/nodes that no longer exist, etc) - #self.wekadata = {} + # clear the metrics from clusterdata self.clusterdata[str(cluster)] = dict() - # reset the cluster config to be sure we can talk to all the hosts - try: - cluster.refresh() - except wekalib.exceptions.NameNotResolvable as exc: - log.critical(f"Names are not resolvable - are they in /etc/hosts or DNS? {exc}") - raise - except Exception as exc: - log.error(f"Cluster refresh failed on cluster '{cluster}' - check connectivity ({exc})") - raise - # set up async api calling subsystem self.asyncobj = Async(cluster, self.max_procs, self.max_threads_per_proc) - self.wekadata = dict() + # reset the cluster config to be sure we can talk to all the hosts + #try: + # cluster.refresh() + #except wekalib.exceptions.NameNotResolvable as exc: + # log.critical(f"Names are not resolvable - are they in /etc/hosts or DNS? {exc}") + # raise + #except Exception as exc: + # log.error(f"Cluster refresh failed on cluster '{cluster}' - check connectivity ({exc})") + # raise + try: self.create_maps() except Exception as exc: diff --git a/events.py b/events.py index 77348ca..2540ddc 100644 --- a/events.py +++ b/events.py @@ -97,10 +97,11 @@ def loki_logevent(self, timestamp, event, **labels): # format the events and send them up to Loki def send_events(self, event_dict, cluster, collector): MINS = 60 - if self.registry.lookup('node-host') is None or self.registry.get_age('node-host') > 5 * MINS: - log.info(f"node-host map not populated... populating") - collector.create_maps() - log.info(f"node-host map populated.") + collector.refresh_maps() + #if self.registry.lookup('node-host') is None or self.registry.get_age('node-host') > 5 * MINS: + # log.info(f"node-host map not populated... populating") + # collector.create_maps() + # log.info(f"node-host map populated.") node_host_map = self.registry.lookup('node-host') if node_host_map is None: diff --git a/export.py b/export.py index 24a9616..8782031 100644 --- a/export.py +++ b/export.py @@ -145,6 +145,7 @@ def prom_client(config): #log.critical(traceback.format_exc()) return + config["registry"] = Registry() # create the WekaCollector object @@ -183,6 +184,8 @@ def prom_client(config): # register our custom collector prometheus_client.REGISTRY.register(collector) + time.sleep(30) # sleep first, just in case we're started at the same time as Loki; give it time + log.info("Starting Event gathering loop") while True: time.sleep(30) # sleep first, just in case we're started at the same time as Loki; give it time if event_processor is not None: From 2ac36d11f9ecad7dfe842daa14f11f74c2b526cb Mon Sep 17 00:00:00 2001 From: Vince Fleming Date: Fri, 9 Aug 2024 15:35:49 -0400 Subject: [PATCH 8/8] version bump --- export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/export.py b/export.py index 8782031..ec149b5 100644 --- a/export.py +++ b/export.py @@ -29,7 +29,7 @@ from wekalib.wekacluster import WekaCluster import wekalib.exceptions -VERSION = "1.7.5" +VERSION = "1.8.0" #VERSION = "experimental"