Skip to content

Commit

Permalink
Merge pull request #65 from weka/loki-only_mode
Browse files Browse the repository at this point in the history
Loki only mode
  • Loading branch information
vince-weka authored Aug 9, 2024
2 parents 3573bff + 2ac36d1 commit 91a865b
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 252 deletions.
169 changes: 97 additions & 72 deletions collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(self, config, cluster_obj): # wekaCollector
self.collect_time = None
self.clusterdata = {}
self.threaderror = False
self.api_stats = {}
self.api_stats = {'num_calls': 0}
exporter = config['exporter']
self.max_procs = exporter['max_procs']
self.max_threads_per_proc = exporter['max_threads_per_proc']
Expand All @@ -115,9 +115,10 @@ def __init__(self, config, cluster_obj): # wekaCollector
self.datapoints_per_collect = exporter['datapoints_per_collect']
else:
self.datapoints_per_collect = 1
self.map_registry = config["map_registry"]
self.registry = config["registry"]

self.cluster = cluster_obj
self.wekadata = dict()

global weka_stat_list

Expand Down Expand Up @@ -347,43 +348,28 @@ def store_results(self, cluster, results):
else:
self.clusterdata[str(cluster)][stat] += result.result

#
# gather() gets fresh stats from the cluster as they update
# populates all datastructures with fresh data
#
#
def gather(self):

cluster = self.cluster
self._reset_metrics()

start_time = time.time()
log.info("gathering weka data from cluster {}".format(str(cluster)))

# re-initialize wekadata so changes in the cluster don't leave behind strange things (hosts/nodes that no longer exist, etc)
wekadata = {}
self.clusterdata[str(cluster)] = wekadata # clear out old data

# reset the cluster config to be sure we can talk to all the hosts
try:
cluster.refresh()
except wekalib.exceptions.NameNotResolvable as exc:
log.critical(f"Names are not resolvable - are they in /etc/hosts or DNS? {exc}")
raise
except Exception as exc:
log.error(f"Cluster refresh failed on cluster '{cluster}' - check connectivity ({exc})")
raise

# set up async api calling subsystem
self.asyncobj = Async(cluster, self.max_procs, self.max_threads_per_proc)
# refresh_maps() is called by the events.py routine to ensure the node-host map is up to date
def refresh_maps(self):
with self._access_lock:
if self.registry.lookup('node-host') is None or self.registry.get_age('node-host') > 5 * 60:
log.info(f"node-host map not populated... populating")
self.create_maps()
log.info(f"node-host map populated.")

def create_maps(self):
# get info from weka cluster - these are quick calls
self.cluster.refresh()
if self.registry.lookup('node-host') is not None and self.registry.get_age('node-host') < 50: # 50 seconds
return # already populated by another thread recently
# re-initialize self.wekadata so changes in the cluster don't leave behind strange things (hosts/nodes that no longer exist, etc)
self.wekadata = dict()
for stat, command in self.WEKAINFO.items():
try:
wekadata[stat] = cluster.call_api(command['method'], command['parms'])
self.wekadata[stat] = self.cluster.call_api(command['method'], command['parms'])
self.api_stats['num_calls'] += 1
except Exception as exc:
log.error(f"error getting {stat} from cluster {cluster}: {exc}")
log.error(f"error getting {stat} from cluster {self.cluster}: {exc}")
# decision - if we can't get the basic info, we can't get anything else, so abort?

# clear old maps, if any - if nodes come/go this can get funky with old data, so re-create it every time
Expand All @@ -393,24 +379,63 @@ def gather(self):

# populate maps
try:
for node_id, node in wekadata["nodeList"].items():
for node_id, node in self.wekadata["nodeList"].items():
node_host_map[node_id] = node["hostname"]
node_role_map[node_id] = node["roles"]
# node["node_id"] = node_id # this used to be inside here...
for host in wekadata["hostList"].values(): # node is a dict of node attribute
for host in self.wekadata["hostList"].values(): # node is a dict of node attribute
if host['hostname'] not in host_role_map: # there may be MCB, so might be there already
if host["mode"] == "backend":
host_role_map[host["hostname"]] = "server"
else:
host_role_map[host["hostname"]] = "client"
# update the maps so they can be used in the loki module
self.map_registry.register('node-host', node_host_map)
self.map_registry.register('node-role', node_role_map)
self.map_registry.register('node-role', host_role_map)
self.registry.register('node-host', node_host_map)
self.registry.register('node-role', node_role_map)
self.registry.register('host-role', host_role_map)
except Exception as exc:
log.error(f"error building maps: {exc}: Aborting data gather from cluster {str(cluster)}")
raise


#
# gather() gets fresh stats from the cluster as they update
# populates all datastructures with fresh data
#
#
def gather(self):

cluster = self.cluster
self._reset_metrics()

start_time = time.time()
log.info("gathering weka data from cluster {}".format(str(cluster)))

# clear the metrics from clusterdata
self.clusterdata[str(cluster)] = dict()

# set up async api calling subsystem
self.asyncobj = Async(cluster, self.max_procs, self.max_threads_per_proc)

# reset the cluster config to be sure we can talk to all the hosts
#try:
# cluster.refresh()
#except wekalib.exceptions.NameNotResolvable as exc:
# log.critical(f"Names are not resolvable - are they in /etc/hosts or DNS? {exc}")
# raise
#except Exception as exc:
# log.error(f"Cluster refresh failed on cluster '{cluster}' - check connectivity ({exc})")
# raise

try:
self.create_maps()
except Exception as exc:
log.error(f"error creating maps: {exc}")
return
node_host_map = self.registry.lookup('node-host')
node_role_map = self.registry.lookup('node-role')
host_role_map = self.registry.lookup('host-role')

log.info(f"Cluster {cluster} Using {cluster.sizeof()} hosts")

# be simplistic at first... let's just gather on a subset of nodes each query
Expand Down Expand Up @@ -449,7 +474,7 @@ def gather(self):
# up_list is a list of all the good hosts (ie: not down)
up_list = list()
backends_list = list()
for host in wekadata['hostList'].values():
for host in self.wekadata['hostList'].values():
if host['status'] == 'UP' and host['state'] == 'ACTIVE' and host['hostname'] not in up_list:
up_list.append(host['hostname'])
if host['mode'] == 'backend':
Expand Down Expand Up @@ -506,9 +531,9 @@ def gather(self):
log.info("populating datastructures for cluster {}".format(str(cluster)))
try:
# determine Cloud Status
if wekadata["clusterinfo"]["cloud"]["healthy"]:
if self.wekadata["clusterinfo"]["cloud"]["healthy"]:
cloudStatus = "Healthy" # must be enabled to be healthy
elif wekadata["clusterinfo"]["cloud"]["enabled"]:
elif self.wekadata["clusterinfo"]["cloud"]["enabled"]:
cloudStatus = "Unhealthy" # enabled, but unhealthy
else:
cloudStatus = "Disabled" # disabled, healthy is meaningless
Expand All @@ -523,25 +548,25 @@ def gather(self):
log.debug(f"weka_info Gauge cluster={cluster.name}")
try:
# Weka status indicator
if (wekadata["clusterinfo"]["buckets"]["active"] == wekadata["clusterinfo"]["buckets"]["total"] and
wekadata["clusterinfo"]["drives"]["active"] == wekadata["clusterinfo"]["drives"]["total"] and
wekadata["clusterinfo"]["io_nodes"]["active"] == wekadata["clusterinfo"]["io_nodes"]["total"] and
wekadata["clusterinfo"]["hosts"]["backends"]["active"] ==
wekadata["clusterinfo"]["hosts"]["backends"]["total"]):
if (self.wekadata["clusterinfo"]["buckets"]["active"] == self.wekadata["clusterinfo"]["buckets"]["total"] and
self.wekadata["clusterinfo"]["drives"]["active"] == self.wekadata["clusterinfo"]["drives"]["total"] and
self.wekadata["clusterinfo"]["io_nodes"]["active"] == self.wekadata["clusterinfo"]["io_nodes"]["total"] and
self.wekadata["clusterinfo"]["hosts"]["backends"]["active"] ==
self.wekadata["clusterinfo"]["hosts"]["backends"]["total"]):
WekaClusterStatus = "OK"
else:
WekaClusterStatus = "WARN"

# Basic info
cluster.release = wekadata["clusterinfo"]["release"] # keep this up to date
wekacluster = {"cluster": str(cluster), "version": wekadata["clusterinfo"]["release"],
"cloud_status": cloudStatus, "license_status": wekadata["clusterinfo"]["licensing"]["mode"],
"io_status": wekadata["clusterinfo"]["io_status"],
"link_layer": wekadata["clusterinfo"]["net"]["link_layer"], "status": WekaClusterStatus}
cluster.release = self.wekadata["clusterinfo"]["release"] # keep this up to date
wekacluster = {"cluster": str(cluster), "version": self.wekadata["clusterinfo"]["release"],
"cloud_status": cloudStatus, "license_status": self.wekadata["clusterinfo"]["licensing"]["mode"],
"io_status": self.wekadata["clusterinfo"]["io_status"],
"link_layer": self.wekadata["clusterinfo"]["net"]["link_layer"], "status": WekaClusterStatus}

metric_objs['wekainfo'].add_metric(labels=wekacluster.keys(), value=wekacluster)

# log.info( "cluster name: " + wekadata["clusterinfo"]["name"] )
# log.info( "cluster name: " + self.wekadata["clusterinfo"]["name"] )
except Exception as exc:
log.error("error cluster info - aborting populate of cluster {}".format(str(cluster)))
raise
Expand All @@ -550,8 +575,8 @@ def gather(self):
try:
# Uptime
# not sure why, but sometimes this would fail... trim off the microseconds, because we really don't care
cluster_time = self._trim_time(wekadata["clusterinfo"]["time"]["cluster_time"])
cluster_start_time = self._trim_time(wekadata["clusterinfo"]["io_status_changed_time"])
cluster_time = self._trim_time(self.wekadata["clusterinfo"]["time"]["cluster_time"])
cluster_start_time = self._trim_time(self.wekadata["clusterinfo"]["io_status_changed_time"])
now_obj = datetime.datetime.strptime(cluster_time, "%Y-%m-%dT%H:%M:%S")
dt_obj = datetime.datetime.strptime(cluster_start_time, "%Y-%m-%dT%H:%M:%S")
uptime = now_obj - dt_obj
Expand All @@ -568,7 +593,7 @@ def gather(self):
# e: weka_overview_activity_num_ops instead of weka_overview_activity_ops
for name, parms in self.CLUSTERSTATS.items():
metric_objs["cluster_stat_" + name].add_metric([str(cluster)],
wekadata["clusterinfo"]["activity"][parms[2]])
self.wekadata["clusterinfo"]["activity"][parms[2]])

except:
# track = traceback.format_exc()
Expand All @@ -577,37 +602,37 @@ def gather(self):

log.debug(f"server overview cluster={cluster.name}")
try:
metric_objs['weka_host_spares'].add_metric([str(cluster)], wekadata["clusterinfo"]["hot_spare"])
metric_objs['weka_host_spares'].add_metric([str(cluster)], self.wekadata["clusterinfo"]["hot_spare"])
metric_objs['weka_host_spares_bytes'].add_metric([str(cluster)],
wekadata["clusterinfo"]["capacity"]["hot_spare_bytes"])
self.wekadata["clusterinfo"]["capacity"]["hot_spare_bytes"])
metric_objs['weka_drive_storage_total_bytes'].add_metric([str(cluster)],
wekadata["clusterinfo"]["capacity"]["total_bytes"])
self.wekadata["clusterinfo"]["capacity"]["total_bytes"])
metric_objs['weka_drive_storage_unprovisioned_bytes'].add_metric([str(cluster)],
wekadata["clusterinfo"]["capacity"][
self.wekadata["clusterinfo"]["capacity"][
"unprovisioned_bytes"])
# this changed after WEKAPP-363037
if 'servers' in wekadata["clusterinfo"]:
if 'servers' in self.wekadata["clusterinfo"]:
stanza = "servers"
else:
stanza = "hosts"
metric_objs['weka_num_servers_active'].add_metric([str(cluster)],
wekadata["clusterinfo"][stanza]["backends"]["active"])
self.wekadata["clusterinfo"][stanza]["backends"]["active"])
metric_objs['weka_num_servers_total'].add_metric([str(cluster)],
wekadata["clusterinfo"][stanza]["backends"]["total"])
self.wekadata["clusterinfo"][stanza]["backends"]["total"])
metric_objs['weka_num_clients_active'].add_metric([str(cluster)],
wekadata["clusterinfo"][stanza]["clients"]["active"])
self.wekadata["clusterinfo"][stanza]["clients"]["active"])
metric_objs['weka_num_clients_total'].add_metric([str(cluster)],
wekadata["clusterinfo"][stanza]["clients"]["total"])
self.wekadata["clusterinfo"][stanza]["clients"]["total"])
metric_objs['weka_num_drives_active'].add_metric([str(cluster)],
wekadata["clusterinfo"]["drives"]["active"])
metric_objs['weka_num_drives_total'].add_metric([str(cluster)], wekadata["clusterinfo"]["drives"]["total"])
self.wekadata["clusterinfo"]["drives"]["active"])
metric_objs['weka_num_drives_total'].add_metric([str(cluster)], self.wekadata["clusterinfo"]["drives"]["total"])
except:
log.error("error processing server overview for cluster {}".format(str(cluster)))

log.debug(f"protection status cluster={cluster.name}")
try:
# protection status
rebuildStatus = wekadata["clusterinfo"]["rebuild"]
rebuildStatus = self.wekadata["clusterinfo"]["rebuild"]
protectionStateList = rebuildStatus["protectionState"]
numStates = len(protectionStateList) # 3 (0,1,2) for 2 parity), or 5 (0,1,2,3,4 for 4 parity)

Expand All @@ -625,7 +650,7 @@ def gather(self):
log.debug(f"filesystems cluster={cluster.name}")
try:
# Filesystem stats
for fs_id, fs in wekadata["fs_stat"].items():
for fs_id, fs in self.wekadata["fs_stat"].items():
fs['total_percent_used'] = float(fs["used_total"]) / float(fs["available_total"]) * 100
fs['ssd_percent_used'] = float(fs["used_ssd"]) / float(fs["available_ssd"]) * 100

Expand All @@ -638,7 +663,7 @@ def gather(self):

log.debug(f"alerts cluster={cluster.name}")
try:
for alert in wekadata["alerts"]:
for alert in self.wekadata["alerts"]:
if not alert["muted"]:
log.debug(f"alert detected {alert['type']}")
host_name = "None"
Expand All @@ -662,7 +687,7 @@ def gather(self):
log.error(f"error {exc} processing alerts for cluster {str(cluster)}")

try:
for disk_id, drive in wekadata["driveList"].items():
for disk_id, drive in self.wekadata["driveList"].items():
# removed drives can have null values - prom client code hates that!
if drive['hostname'] is None or len(drive['hostname']) == 0:
drive['hostname'] = "None"
Expand Down Expand Up @@ -697,7 +722,7 @@ def gather(self):
# 'reclaimable_high_thresh', 'reclaimable_low_thresh',
# 'reclaimable_thresh', 'total_consumed_cap', 'used_cap'])
try:
for fs in wekadata["obs_capacity"]:
for fs in self.wekadata["obs_capacity"]:
metric_objs['fs_obs_total_consumed'].add_metric([str(cluster), fs['filesystem_name'], fs['fsId'],
fs['obs_bucket_name']], fs['total_consumed_capacity'])
metric_objs['fs_obs_cap_used'].add_metric([str(cluster), fs['filesystem_name'], fs['fsId'],
Expand Down Expand Up @@ -798,7 +823,7 @@ def get_stat(raw_stats_data):

# ------------- end of gather() -------------

def collect_logs(self, lokiserver):
def collect_logs(self, event_processor):
with self._access_lock: # make sure we don't conflict with a metrics collection
log.info(f"getting events for cluster {self.cluster}")
try:
Expand All @@ -816,7 +841,7 @@ def collect_logs(self, lokiserver):
return

try:
lokiserver.send_events(events, self.cluster)
event_processor.send_events(events, self.cluster, self)
except Exception as exc:
log.critical(f"Error sending events: {exc} for cluster {self.cluster}")
# log.critical(f"{traceback.format_exc()}")
Expand Down
Loading

0 comments on commit 91a865b

Please sign in to comment.