Skip to content

Commit

Permalink
Agents forwarding (#21)
Browse files Browse the repository at this point in the history
This pull request adds the logic that enables forwarding of requests
from sunfish to the Hardware agents
  • Loading branch information
christian-pinto authored Mar 11, 2024
2 parents 963736e + 4a75253 commit adf05f8
Show file tree
Hide file tree
Showing 12 changed files with 1,026 additions and 433 deletions.
3 changes: 3 additions & 0 deletions sunfish/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright IBM Corp. 2024
# This software is available to you under a BSD 3-Clause License.
# The full license terms are available here: https://github.com/OpenFabrics/sunfish_library_reference/blob/main/LICENSE
189 changes: 112 additions & 77 deletions sunfish/events/redfish_event_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright IBM Corp. 2023
# This software is available to you under a BSD 3-Clause License.
# The full license terms are available here: https://github.com/OpenFabrics/sunfish_library_reference/blob/main/LICENSE
import json
import logging
import os
from uuid import uuid4
Expand All @@ -11,7 +12,99 @@
from sunfish.events.redfish_subscription_handler import subscribtions
from sunfish.lib.exceptions import *

logger = logging.getLogger("RedfishEventHandler")


class RedfishEventHandlersTable:
@classmethod
def AggregationSourceDiscovered(cls, event_handler: EventHandlerInterface, event: dict, context: str):
###
# Fabric Agents are modelled as AggregationSource objects (RedFish v2023.1 at the time of writing this comment)
# Registration will happen with the OFMF receiving a and event with MessageId: AggregationSourceDiscovered
# The arguments of the event message are:
# - Arg1: "Redfish"
# - Arg2: "agent_ip:port"
# I am also assuming that the agent name to be used is contained in the OriginOfCondifiton field of the event as in the below example:
# {
# "OriginOfCondition: [
# "@odata.id" : "/redfish/v1/AggregationService/AggregationSource/AgentName"
# ]"
# }
logger.info("AggregationSourceDiscovered method called")

connectionMethodId = event['OriginOfCondition']['@odata.id']
hostname = event['MessageArgs'][1] # Agent address

response = requests.get(f"{hostname}/{connectionMethodId}")
if response.status_code != 200:
raise Exception("Cannot find ConnectionMethod")
response = response.json()

### Save agent registration
connection_method_name = connectionMethodId.split('/')[-1]
connection_method_name = connectionMethodId[:-len(connection_method_name)]
event_handler.core.create_object(connection_method_name, response)

connection_method_template = {
"@odata.type": "#AggregationSource.v1_2_.AggregationSource",
"HostName": hostname,
"Links": {
"ConnectionMethod": {
"@odata.id": connectionMethodId
},
"ResourcesAccessed": []
}
}

try:
resp_post = event_handler.core.create_object(
os.path.join(event_handler.core.conf["redfish_root"], "AggregationService/AggregationSources"),
connection_method_template)
except Exception:
raise Exception()

aggregation_source_id = resp_post['@odata.id']
agent_subscription_context = {"Context": aggregation_source_id.split('/')[-1]}

resp_patch = requests.patch(f"{hostname}/redfish/v1/EventService/Subscriptions/SunfishServer",
json=agent_subscription_context)

return resp_patch

@classmethod
def ResourceCreated(cls, event_handler: EventHandlerInterface, event: dict, context: str):
if context == "":
raise PropertyNotFound("Missing agent context in ResourceCreated event")

logger.info("New resource created")

id = event['OriginOfCondition']['@odata.id'] # /redfish/v1/Fabrics/CXL
aggregation_source = event_handler.core.storage_backend.read(
os.path.join(event_handler.core.conf["redfish_root"],
"AggregationService", "AggregationSources", context)
)
hostname = aggregation_source["HostName"]

response = requests.get(f"{hostname}/{id}")
if response.status_code != 200:
raise Exception("Cannot find ConnectionMethod")
response = response.json()

add_aggregation_source_reference(response, aggregation_source)

event_handler.core.create_object(id, response)

RedfishEventHandler.bfsInspection(event_handler.core, response, aggregation_source)

event_handler.core.storage_backend.patch(id, aggregation_source)


class RedfishEventHandler(EventHandlerInterface):
dispatch_table = {
"AggregationSourceDiscovered": RedfishEventHandlersTable.AggregationSourceDiscovered,
"ResourceCreated": RedfishEventHandlersTable.ResourceCreated
}

def __init__(self, core):
"""init that sets the conf and calls the load subcriptions method
Expand All @@ -23,6 +116,12 @@ def __init__(self, core):
self.fs_root = core.conf["backend_conf"]["fs_root"]
self.subscribers_root = core.conf["backend_conf"]["subscribers_root"]
self.backend = core.storage_backend
@classmethod
def dispatch(cls, message_id: str, event_handler: EventHandlerInterface, event: dict, context: str):
if message_id in cls.dispatch_table:
return cls.dispatch_table[message_id](event_handler, event, context)
else:
logger.debug(f"Message id '{message_id}' does not have a custom handler")

def new_event(self, payload):
"""Compares event's information with the subsribtions data structure to find the Ids of the subscribers for that event.
Expand Down Expand Up @@ -118,8 +217,11 @@ def forward_event(self, list, payload):
try:
data = self.core.get_object(path)
# print('send to: ', data["Id"])
requests.post(data['Destination'], json=payload)
except requests.exceptions.ConnectionError as e:
resp = requests.post(data['Destination'], json=payload)
resp.raise_for_status()
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
logger.warning(f"Unable to contact event destination {id} for event , skipping.")
logger.warning(f"Event log: \n{json.dumps(payload, indent=2)}")
list.remove(id)
except ResourceNotFound:
raise ResourceNotFound(path)
Expand All @@ -138,76 +240,6 @@ def check_subdirs(self, origin):
to_forward.append(id)

return to_forward

def AggregationSourceDiscovered(self, event, context):
###
# Fabric Agents are modelled as AggregationSource objects (RedFish v2023.1 at the time of writing this comment)
# Registration will happen with the OFMF receiving a and event with MessageId: AggregationSourceDiscovered
# The arguments of the event message are:
# - Arg1: "Redfish"
# - Arg2: "agent_ip:port"
# I am also assuming that the agent name to be used is contained in the OriginOfCondifiton field of the event as in the below example:
# {
# "OriginOfCondition: [
# "@odata.id" : "/redfish/v1/AggregationService/AggregationSource/AgentName"
# ]"
# }
logging.info("AggregationSourceDiscovered method called")

connectionMethodId = event['OriginOfCondition']['@odata.id']
hostname = event['MessageArgs'][1] # Agent address

response = requests.get(f"{hostname}/{connectionMethodId}")
if response.status_code != 200:
raise Exception("Cannot find ConnectionMethod")
response = response.json()

### Save agent registration
connection_method_name = connectionMethodId.split('/')[-1]
connection_method_name = connectionMethodId[:-len(connection_method_name)]
self.create_object(connection_method_name, response)

connection_method_template = {
"@Redfish.Copyright": "Copyright 2014-2021 SNIA. All rights reserved.",
"@odata.type": "#AggregationSource.v1_2_.AggregationSource",
"HostName": hostname,
"Links": {
"ConnectionMethod": {
"@odata.id": connectionMethodId
},
"ResourcesAccessed": [ ]
}
}

resp_post = self.create_object(os.path.join(self.conf["redfish_root"], "AggregationService/AggregationSources"), connection_method_template)

aggregation_source_id = resp_post['@odata.id']
agent_subscription_context = {"Context": aggregation_source_id.split('/')[-1]}

resp_patch = requests.patch(f"{hostname}/redfish/v1/EventService/Subscriptions/SunfishServer",
json=agent_subscription_context)

return resp_patch

def ResourceCreated(self, event, context):
logging.info("New resource created")

id = event['OriginOfCondition']['@odata.id'] # /redfish/v1/Fabrics/CXL
aggregation_source = self.get_object(
os.path.join(self.conf["redfish_root"], "AggregationService", "AggregationSources", context))
hostname = aggregation_source["HostName"]

response = requests.get(f"{hostname}/{id}")
if response.status_code != 200:
raise Exception("Cannot find ConnectionMethod")
object = response.json()
add_aggregation_source_reference(object, aggregation_source)

self.create_object(id, object)

RedfishEventHandler.bfsInspection(self, object, aggregation_source)

self.patch_object(id, aggregation_source)

def bfsInspection(self, node, aggregation_source):
queue = []
Expand All @@ -234,7 +266,7 @@ def handleNestedObject(self, obj):
redfish_obj = RedfishEventHandler.fetchResourceAndTree(self, id, aggregation_source, visited, queue, fetched)

if redfish_obj is None or type(redfish_obj) != dict:
logging.info(f"Resource - {id} - not available")
logger.info(f"Resource - {id} - not available")
continue

for key, val in redfish_obj.items():
Expand Down Expand Up @@ -274,10 +306,10 @@ def fetchResourceAndTree(self, id, aggregation_source, visited, queue, fetched):
need_parent_prefetch = False
for node_position in range(4, len(path_nodes) - 1):
redfish_path = f'/redfish/v1/{"/".join(path_nodes[3:node_position + 1])}'
logging.info(f"Checking redfish path: {redfish_path}")
logger.info(f"Checking redfish path: {redfish_path}")
if redfish_path not in visited:
need_parent_prefetch = True
logging.info(f"Inspect redfish path: {redfish_path}")
logger.info(f"Inspect redfish path: {redfish_path}")
queue.append(redfish_path)
visited.append(redfish_path)
if need_parent_prefetch: # requeue
Expand All @@ -289,7 +321,7 @@ def fetchResourceAndTree(self, id, aggregation_source, visited, queue, fetched):

def fetchResource(self, obj_id, aggregation_source):
resource_endpoint = aggregation_source["HostName"] + obj_id
logging.info(f"fetch: {resource_endpoint}")
logger.info(f"fetch: {resource_endpoint}")
response = requests.get(resource_endpoint)

if response.status_code == 200:
Expand All @@ -303,6 +335,8 @@ def fetchResource(self, obj_id, aggregation_source):
def createInspectedObject(self,redfish_obj, aggregation_source):
if '@odata.id' in redfish_obj:
obj_path = os.path.relpath(redfish_obj['@odata.id'], self.conf['redfish_root'])
else:
raise PropertyNotFound(f"missing @odata.id in \n {json.dumps(redfish_obj, indent=2)}")

file_path = os.path.join(self.conf['redfish_root'], obj_path)
#file_path = create_path(constants.PATHS['Root'], obj_path)
Expand All @@ -316,7 +350,8 @@ def createInspectedObject(self,redfish_obj, aggregation_source):
except ResourceNotFound:
add_aggregation_source_reference(redfish_obj, aggregation_source)
self.create_object(file_path, redfish_obj)

else:
logger.debug("This is a collection")

def add_aggregation_source_reference(redfish_obj, aggregation_source):
if "Oem" not in redfish_obj:
Expand Down
Loading

0 comments on commit adf05f8

Please sign in to comment.