From af8baf09200746d08744100fa152fe2f0d83c146 Mon Sep 17 00:00:00 2001 From: jarwils Date: Thu, 7 Sep 2023 08:28:10 -0600 Subject: [PATCH 01/35] adding alicanto co-provider feature --- .../pybennu/providers/alicanto/__init__.py | 9 + .../pybennu/providers/alicanto/alicanto.py | 79 ++++ .../providers/alicanto/alicanto_helper.py | 346 ++++++++++++++++++ .../pybennu/providers/power/power_daemon.py | 35 ++ 4 files changed, 469 insertions(+) create mode 100644 src/pybennu/pybennu/providers/alicanto/__init__.py create mode 100644 src/pybennu/pybennu/providers/alicanto/alicanto.py create mode 100644 src/pybennu/pybennu/providers/alicanto/alicanto_helper.py diff --git a/src/pybennu/pybennu/providers/alicanto/__init__.py b/src/pybennu/pybennu/providers/alicanto/__init__.py new file mode 100644 index 00000000..705fabd3 --- /dev/null +++ b/src/pybennu/pybennu/providers/alicanto/__init__.py @@ -0,0 +1,9 @@ +###################### UNCLASSIFIED // OFFICIAL USE ONLY ###################### +# +# Copyright 2018 National Technology & Engineering Solutions of Sandia, +# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, +# there is a non-exclusive license for use of this work by or on behalf +# of the U.S. Government. Export of this data may require a license from +# the United States Government. +# +############################################################################### diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto.py b/src/pybennu/pybennu/providers/alicanto/alicanto.py new file mode 100644 index 00000000..8a0209a8 --- /dev/null +++ b/src/pybennu/pybennu/providers/alicanto/alicanto.py @@ -0,0 +1,79 @@ +###################### UNCLASSIFIED // OFFICIAL USE ONLY ###################### +# +# Copyright 2018 National Technology & Engineering Solutions of Sandia, +# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, +# there is a non-exclusive license for use of this work by or on behalf +# of the U.S. Government. Export of this data may require a license from +# the United States Government. +# +############################################################################### +"""bennu provider / alicanto handler + +Provides an interface between bennu field devices and a alicanto federation. +""" +import logging +import threading + +from distutils.util import strtobool + +from pybennu.distributed.client import Client +import pybennu.distributed.swig._Endpoint as E +from pybennu.providers.alicanto.alicanto_helper import alicantoFederate + +logger = logging.getLogger('alicanto') +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +class Alicanto(alicantoFederate): + """ bennu provider / alicanto federate """ + + def __init__(self, server_endpoint, publish_endpoint, config, debug=False): + self.__lock = threading.Lock() + if debug: + logger.setLevel(logging.DEBUG) + + + + # Initialize alicantoFederate and start run() method in a separate thread + ## Need to write a way for alicanto to publish updates + alicantoFederate.__init__(self, config) + self.__publish_thread = threading.Thread(target=alicantoFederate.run, + args=(self,)) + self.__publish_thread.daemon = True + self.__publish_thread.start() + + # Initialize system state + self.state = {} + for tag in self.tags: + self.state[tag] = False if self.get_type(tag) == 'bool' else 0 + + + def tag(self, tag, value=None): + with self.__lock: + if value is not None: + self.state[tag] = value + else: + if tag in self.state: + return self.state[tag] + else: + return False if self.get_type(tag) == 'bool' else 0 + + + # CLIENT write helper + def write(self, tag, value): + with self.__lock: + print("### PROCESSING UPDATE ###") + try: + for tag in self.tags: + print(f"### TAG: {tag} ###") + if tag not in self.state: + raise NameError(f"{tag} is not a known tag name") + value = self.tags[tag] + self.state[tag] = value + self.send_msg_to_endpoint(tag, value) + print(f"### UPDATED: {tag}:{value} ###") + except Exception as err: + print(f"ERROR: failed to process update: {err}") + return f'ERR={err}' + return 'ACK=Success processing alicanto write command' diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py new file mode 100644 index 00000000..8447e65b --- /dev/null +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -0,0 +1,346 @@ +###################### UNCLASSIFIED // OFFICIAL USE ONLY ###################### +# +# Copyright 2018 National Technology & Engineering Solutions of Sandia, +# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, +# there is a non-exclusive license for use of this work by or on behalf +# of the U.S. Government. Export of this data may require a license from +# the United States Government. +# +############################################################################### +"""alicanto helper class +""" +import json +import logging +import signal +import sys +import time +import threading +import math + +from distutils.util import strtobool +from py_expression_eval import Parser + +from pybennu.distributed.subscriber import Subscriber +from pybennu.distributed.client import Client +import pybennu.distributed.swig._Endpoint as E + +logger = logging.getLogger('alicanto') +logger.setLevel(logging.INFO) + +class TestSubscriber(Subscriber): + def __init__(self, sub_source): + new_publish_endpoint = E.new_Endpoint() + E.Endpoint_str_set(new_publish_endpoint, 'udp://'+str(sub_source)) + Subscriber.__init__(self, new_publish_endpoint) + +class TestClient(Client): + def __init__(self, end_dest): + new_endpoint_dest = E.new_Endpoint() + E.Endpoint_str_set(new_endpoint_dest, 'tcp://'+str(end_dest)) + Client.__init__(self, new_endpoint_dest) + + def send(self, message): + """ Send message to Provider + """ + # send update + self._Client__socket.send_string(message+'\0') # must include null byte + # get response + msg = self._Client__socket.recv_string() + reply = msg.split('=') + status = reply[0] + data = reply[1] + + if status == self._Client__kACK: + print("I: ACK: "+data) + #self.reply_handler(data) + else: + print("I: ERR -- %s" % msg) + + return reply + +class alicantoFederate(): + def __init__(self, config, exit_handler=None): + exit = exit_handler if exit_handler else self.ctrl_exit_handler + signal.signal(signal.SIGINT, exit) # ctlr + c + # Tag=>destination map + self.dests = {} + # Tag=>type map + self.types = {} + # Tag=>logic map + self.logic = {} + # Expression parser for logic + self.parser = Parser() + # Set of all tags (for printing) + self.tags = {} + + ############## Get counts from json ###################### + cfg = None + self.end_count = 0 + self.sub_count = 0 + self.pub_count = 0 + + with open(config, 'r') as file_: + cfg = json.loads(file_.read()) + try: + self.end_count = len(cfg["endpoints"]) + logger.info(f"\tNumber of endpoints: {self.end_count}") + except: + logger.info(f"\tNumber of endpoints: {self.end_count}") + try: + self.sub_count = len(cfg["subscriptions"]) + logger.info(f"\tNumber of subscriptions: {self.sub_count}") + except: + logger.info(f"\tNumber of subscriptions: {self.sub_count}") + try: + self.pub_count = len(cfg["publications"]) + logger.info(f"\tNumber of publications: {self.pub_count}") + except: + logger.info(f"\tNumber of publications: {self.pub_count}") + + # Diagnostics to confirm JSON config correctly added the required + # endpoints, publications, and subscriptions + self.endid = {} + self.end_dests = [] + for i in range(0, self.end_count): + self.endid[i] = cfg["endpoints"][i] + end_name = self.endid[i]["name"] + end_destination = self.endid[i]["destination"] + end_type = self.endid[i]["type"] + logger.debug(f"\tRegistered endpoint ---> end_name: {end_name} ---> end_destination: {end_destination}") + #end_name = end_name.split('/')[1] if '/' in end_name else end_name + self.tags.update({end_destination : 0}) + self.end_dests.append(end_destination) + self.dests[end_name] = end_destination + self.types[end_name] = end_type + self.types[end_destination] = end_type + # make end_dests elements unique + self.end_dests = list(set(self.end_dests)) + + self.subid = {} + self.sub_sources = [] + for i in range(0, self.sub_count): + self.subid[i] = cfg["subscriptions"][i] + sub_name = self.subid[i]["key"] + sub_type = self.subid[i]["type"] + sub_source = sub_name.split('/')[0] + self.sub_sources.append(sub_source) + try: + sub_info = self.subid[i]["info"] # stores logic for interdependencies + except: + sub_info = None + logger.debug(f"\tRegistered subscription ---> sub_name: {sub_name} ---> sub_type: {sub_type} ---> sub_info: {sub_info}") + #sub_name = sub_name.split('/')[1] if '/' in sub_name else sub_name + self.tags.update({sub_name : 0 }) + self.types[sub_name] = sub_type + if sub_info: + logger.debug(f"\t\t********** LOGIC **********") + for exp in sub_info.split(';'): + lhs, rhs = exp.split('=') + self.logic[lhs.strip()] = rhs.strip() + logger.debug(f'\t\t{exp.strip()}') + #make sub_sources elements unique + self.sub_sources = list(set(self.sub_sources)) + + self.__lock = threading.Lock() + + + for sub_source in self.sub_sources: + logger.debug(f"\tLaunching Subscriber Thread ---> subscription: udp://{sub_source}") + subber = TestSubscriber(sub_source) + #Subscriber.__init__(self, new_publish_endpoint) + #subber = Subscriber(new_publish_endpoint) + subber.subscription_handler = self._subscription_handler + self.__sub_thread = threading.Thread(target=subber.run) + self.__sub_thread.setName(sub_source) + self.__sub_thread.daemon = True + self.__sub_thread.start() + #subber.run() + + #new_server_endpoint = E.new_Endpoint() + #E.Endpoint_str_set(new_server_endpoint, 'tcp://127.0.0.1:5556') + #self.client = Client(new_server_endpoint) + #self.client = TestClient("127.0.0.1:5556") + + + self.end_clients = {} + for end_dest in self.end_dests: + # Initialize bennu Client + end_dest = end_dest.split('/')[0] + self.end_clients[end_dest] = TestClient(end_dest) + logger.debug(f"\tEnd_clients: {self.end_clients}") + + def run(self): + ############## Entering Execution Mode ############################## + #h.alicantoFederateEnterExecutingMode(self.fed) + logger.info("Entered alicanto execution mode") + ''' + grantedtime = 0 + + # Blocking call for a time request at simulation time 0 + time = h.alicanto_TIME_ZERO + logger.debug(f"Requesting initial time {time}") + grantedtime = h.alicantoFederateRequestTime(self.fed, time) + logger.debug(f"Granted time {grantedtime}") + ''' + + # Endpoint initial values to alicanto + for i in range(self.end_count): + end_name = self.endid[i]["name"] + end_name = (end_name.split('/')[1] + if '/' in end_name + else end_name) + full_end_dest = self.endid[i]["destination"] + end_dest = (full_end_dest.split('/')[0] + if '/' in full_end_dest + else full_end_dest) + #value = self.tag(end_name) + self.end_clients[end_dest] = TestClient(end_dest) + reply = self.end_clients[end_dest].send("READ="+end_name) + value = reply[1].rstrip('\x00') + self.endid[i]["value"] = value + self.tag(full_end_dest, value) + logger.debug(f"Initial Endpoints {end_name} / {end_dest}:{value} ") + + + + + ########## Main co-simulation loop #################################### + # As long as granted time is in the time range to be simulated... + while True: + self.print_state() + time.sleep(1) + for i in range(self.end_count): + full_end_name = self.endid[i]["name"] + end_name = (full_end_name.split('/')[1] + if '/' in full_end_name + else full_end_name) + full_end_dest = self.endid[i]["destination"] + end_dest = (full_end_dest.split('/')[0] + if '/' in full_end_dest + else full_end_dest) + end_dest_tag = (full_end_dest.split('/')[1] + if '/' in full_end_dest + else full_end_dest) + + # !!need to add something to handle binary points + if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double': + if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))): + self.end_clients[end_dest] = TestClient(end_dest) + self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) + time.sleep(0.5) + reply = self.end_clients[end_dest].send("READ="+end_name) + value = reply[1].rstrip('\x00') + self.tag(full_end_dest, value) + elif self.types[full_end_name] == 'bool': + if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower(): + self.end_clients[end_dest] = TestClient(end_dest) + self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) + time.sleep(0.5) + reply = self.end_clients[end_dest].send("READ="+end_name) + value = reply[1].rstrip('\x00') + self.tag(full_end_dest, value) + + + + def print_state(self): + logger.debug("=================== DATA ===================") + for tag in self.tags: + logger.debug(f"{tag:<30} --- {self.tag(tag):}") + logger.debug("============================================") + + def action_post_request_time(self): + pass + + def send_msg_to_endpoint(self, name, value): + endpoint = self.fed.get_endpoint_by_name(name) + endpoint.send_data(value.encode(), endpoint.default_destination) + + def get_type(self, tag): + return self.types[tag] + + def _tag(self, tag, val): + if self.get_type(tag) == 'bool': + # Make sure any val is 'true'|'false' + val = bool(strtobool(str(val))) + val = str(val).lower() + else: + val = float(val) + return self.tag(tag, val) + + def tag(self, tag, val=None): + raise NotImplementedError("Subclass and implement this method") + + def _client_handler(reply): + split = reply.split(',') + print("Reply:") + if len(split) > 1: + for tag in split: + print("\t", tag) + else: + print("\t", split[0]) + return split[0] + + def _subscription_handler(self, message): + """Receive Subscription message + This method gets called by the Subscriber base class in its run() + method when a message from a publisher is received. + + Ex message: "load-1_bus-101.mw:999.000,load-1_bus-101.active:true," + + Args: + message (str): published zmq message as a string + """ + points = message.split(',') + points = points[:-1] # remove last element since it might be empty + #print(message) + sub_source = threading.current_thread().name + + for point in points: + if point == "": + continue + ''' + split = point.split('/') + tag_source = split[0] + tag = split[1].split(':')[0] + value = split[1].split(':')[0] + ''' + + tag = point.split(':')[0] + full_tag = sub_source + '/' + tag + value = point.split(':')[1] + + if full_tag not in self.tags: + continue + if value.lower() == 'false': + value = False + field = 'status' + elif value.lower() == 'true': + value = True + field = 'status' + else: + value = float(value) + field = 'value' + + if field == 'value': + if not math.isclose(float(self.tag(full_tag)), value): + with self.__lock: + self.tag(full_tag, value) + #update + print("UPDATE NOW: ",full_tag) + print("New value: ",value) + else: + continue + elif field == 'status': + self.tag(full_tag, value) + #update + print("UPDATE NOW: ",full_tag) + print("New value: ",value) + else: + continue + + + + + def ctrl_exit_handler(self, signal, frame): + print("SIGINT or CTRL-C detected. Exiting gracefully") + sys.exit() diff --git a/src/pybennu/pybennu/providers/power/power_daemon.py b/src/pybennu/pybennu/providers/power/power_daemon.py index 32998f9f..65230a2e 100644 --- a/src/pybennu/pybennu/providers/power/power_daemon.py +++ b/src/pybennu/pybennu/providers/power/power_daemon.py @@ -1,3 +1,12 @@ +###################### UNCLASSIFIED // OFFICIAL USE ONLY ###################### +# +# Copyright 2018 National Technology & Engineering Solutions of Sandia, +# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, +# there is a non-exclusive license for use of this work by or on behalf +# of the U.S. Government. Export of this data may require a license from +# the United States Government. +# +############################################################################### import argparse import os import platform @@ -266,6 +275,32 @@ def run(self): config_file, debug) ######################################### + ############### ALICANTO ################ + ######################################### + elif solver == 'Alicanto': + try: + E.Endpoint_str_set(server_endpoint, + config.get('power-solver-service', + 'server-endpoint').strip()) + E.Endpoint_str_set(publish_endpoint, + config.get('power-solver-service', + 'publish-endpoint').strip()) + + config_file = config.get('power-solver-service', + 'config-file').strip() + except NoOptionError: + print("\nERROR: The following must be defined in the " + "configuration file: 'server-endpoint', " + "'publish-endpoint', 'config-file'\n") + sys.exit(-1) + + from pybennu.providers.alicanto.alicanto \ + import Alicanto + self.solver = Alicanto(server_endpoint, + publish_endpoint, + config_file, + debug) + ######################################### ############### Default ################# ######################################### else: From 401b22cfa82dda326024176ed0d315030ab17014 Mon Sep 17 00:00:00 2001 From: jarwils Date: Thu, 7 Sep 2023 08:47:58 -0600 Subject: [PATCH 02/35] initial documentation, will need more --- .../pybennu/providers/alicanto/README.md | 12 +++++++++ .../pybennu/providers/alicanto/alicanto.json | 25 +++++++++++++++++++ .../pybennu/providers/alicanto/config.ini | 6 +++++ 3 files changed, 43 insertions(+) create mode 100644 src/pybennu/pybennu/providers/alicanto/README.md create mode 100644 src/pybennu/pybennu/providers/alicanto/alicanto.json create mode 100644 src/pybennu/pybennu/providers/alicanto/config.ini diff --git a/src/pybennu/pybennu/providers/alicanto/README.md b/src/pybennu/pybennu/providers/alicanto/README.md new file mode 100644 index 00000000..2a9e2647 --- /dev/null +++ b/src/pybennu/pybennu/providers/alicanto/README.md @@ -0,0 +1,12 @@ +Alicanto is a new feature made to be a more simple co-simulation tool than HELICS. + +The code is similar to the bennu HELICS code but stripped down. +Alicanto runs as a Subscriber and Client object. It takes in a configuration file (which points to a json) which defines which points Alicanto cares about +JSON format + - Subscriptions tell Alicanto which publish point (udp) to subscrie to and which point to keep track of + - Endpoints tell Alicanto where to corelate that subscribed point to a server-endpoint + +Usage: +`pybennu-power-solver -c config.ini -v start` + +Please update this README as Alicanto is used more \ No newline at end of file diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto.json b/src/pybennu/pybennu/providers/alicanto/alicanto.json new file mode 100644 index 00000000..d4f5931b --- /dev/null +++ b/src/pybennu/pybennu/providers/alicanto/alicanto.json @@ -0,0 +1,25 @@ +{ + "log_level": 3, + "subscriptions": [ + { + "key": "239.0.0.1:40000/bus-1.voltage", + "type": "double" + }, + { + "key": "239.0.0.1:40001/bus-2.voltage", + "type": "double" + } + ], + "endpoints" : [ + { + "name": "239.0.0.1:40000/bus-1.voltage", + "destination": "127.0.0.1:5557/bus-1.voltage", + "type": "double" + }, + { + "name": "239.0.0.1:40001/bus-2.voltage", + "destination": "127.0.0.1:5556/bus-2.voltage", + "type": "double" + } + ] +} diff --git a/src/pybennu/pybennu/providers/alicanto/config.ini b/src/pybennu/pybennu/providers/alicanto/config.ini new file mode 100644 index 00000000..60a185ac --- /dev/null +++ b/src/pybennu/pybennu/providers/alicanto/config.ini @@ -0,0 +1,6 @@ +[power-solver-service] +solver-type = Alicanto +config-file = /root/alicanto.json +server-endpoint = +publish-endpoint = +debug = true From 7a57706505beba034a5145f5f5e01aaa5ecaca90 Mon Sep 17 00:00:00 2001 From: jarwils Date: Tue, 12 Sep 2023 14:49:28 -0600 Subject: [PATCH 03/35] fix headers --- src/pybennu/pybennu/providers/alicanto/__init__.py | 10 +--------- src/pybennu/pybennu/providers/alicanto/alicanto.py | 9 --------- .../pybennu/providers/alicanto/alicanto_helper.py | 9 --------- 3 files changed, 1 insertion(+), 27 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/__init__.py b/src/pybennu/pybennu/providers/alicanto/__init__.py index 705fabd3..8b137891 100644 --- a/src/pybennu/pybennu/providers/alicanto/__init__.py +++ b/src/pybennu/pybennu/providers/alicanto/__init__.py @@ -1,9 +1 @@ -###################### UNCLASSIFIED // OFFICIAL USE ONLY ###################### -# -# Copyright 2018 National Technology & Engineering Solutions of Sandia, -# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, -# there is a non-exclusive license for use of this work by or on behalf -# of the U.S. Government. Export of this data may require a license from -# the United States Government. -# -############################################################################### + diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto.py b/src/pybennu/pybennu/providers/alicanto/alicanto.py index 8a0209a8..df6b57d2 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto.py @@ -1,12 +1,3 @@ -###################### UNCLASSIFIED // OFFICIAL USE ONLY ###################### -# -# Copyright 2018 National Technology & Engineering Solutions of Sandia, -# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, -# there is a non-exclusive license for use of this work by or on behalf -# of the U.S. Government. Export of this data may require a license from -# the United States Government. -# -############################################################################### """bennu provider / alicanto handler Provides an interface between bennu field devices and a alicanto federation. diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index 8447e65b..ec7a1b64 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -1,12 +1,3 @@ -###################### UNCLASSIFIED // OFFICIAL USE ONLY ###################### -# -# Copyright 2018 National Technology & Engineering Solutions of Sandia, -# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, -# there is a non-exclusive license for use of this work by or on behalf -# of the U.S. Government. Export of this data may require a license from -# the United States Government. -# -############################################################################### """alicanto helper class """ import json From 10d99fca51959bb3b032ae4d52365745e89cd1a2 Mon Sep 17 00:00:00 2001 From: jarwils Date: Wed, 1 Nov 2023 11:06:47 -0600 Subject: [PATCH 04/35] working on bug --- .../providers/alicanto/alicanto_helper.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index ec7a1b64..e30a6989 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -18,12 +18,14 @@ logger = logging.getLogger('alicanto') logger.setLevel(logging.INFO) +# TestSubscriber used for easy multiple subscribers class TestSubscriber(Subscriber): def __init__(self, sub_source): new_publish_endpoint = E.new_Endpoint() E.Endpoint_str_set(new_publish_endpoint, 'udp://'+str(sub_source)) Subscriber.__init__(self, new_publish_endpoint) +#TestClient used with special send function which returns the reply class TestClient(Client): def __init__(self, end_dest): new_endpoint_dest = E.new_Endpoint() @@ -176,17 +178,20 @@ def run(self): # Endpoint initial values to alicanto for i in range(self.end_count): - end_name = self.endid[i]["name"] - end_name = (end_name.split('/')[1] - if '/' in end_name - else end_name) + full_end_name = self.endid[i]["name"] + end_name = (full_end_name.split('/')[1] + if '/' in full_end_name + else full_end_name) full_end_dest = self.endid[i]["destination"] end_dest = (full_end_dest.split('/')[0] if '/' in full_end_dest else full_end_dest) + end_dest_tag = (full_end_dest.split('/')[1] + if '/' in full_end_dest + else full_end_dest) #value = self.tag(end_name) self.end_clients[end_dest] = TestClient(end_dest) - reply = self.end_clients[end_dest].send("READ="+end_name) + reply = self.end_clients[end_dest].send("READ="+end_dest_tag) value = reply[1].rstrip('\x00') self.endid[i]["value"] = value self.tag(full_end_dest, value) @@ -213,13 +218,13 @@ def run(self): if '/' in full_end_dest else full_end_dest) - # !!need to add something to handle binary points + if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double': if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))): self.end_clients[end_dest] = TestClient(end_dest) self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_name) + reply = self.end_clients[end_dest].send("READ="+end_dest_tag) value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) elif self.types[full_end_name] == 'bool': @@ -227,7 +232,7 @@ def run(self): self.end_clients[end_dest] = TestClient(end_dest) self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_name) + reply = self.end_clients[end_dest].send("READ="+end_dest_tag) value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) From 0c65709af70a6b11341c66ae468a96f374c6a1b3 Mon Sep 17 00:00:00 2001 From: jarwils Date: Mon, 13 Nov 2023 16:07:45 -0700 Subject: [PATCH 05/35] first shot add logic --- .../providers/alicanto/alicanto_helper.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index e30a6989..462f54ac 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -221,6 +221,24 @@ def run(self): if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double': if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))): + #Handle Logic + if self.logic[end_dest_tag] is not None: + expr = self.parser.parse(self.logic[end_dest_tag]) + # Assign variables + vars = {} + for var in expr.variables(): + vars[var] = self.tag(var) + # Evaluate expression + value = expr.evaluate(vars) + value = str(value).lower() + if value != self.tag(end_dest_tag): + logger.debug(f"\tLOGIC: {end_dest_tag.strip()}={self.logic[end_dest_tag]} ----> {value}") + # Assign new tag value + self._tag(end_dest_tag.strip(), value) + # Skip if value is unchanged + elif value == self.tag(end_dest_tag): + continue + self.end_clients[end_dest] = TestClient(end_dest) self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) time.sleep(0.5) From 11d9d7f11e697efb4e8dc91e348fdb6cff4d03f8 Mon Sep 17 00:00:00 2001 From: jarwils Date: Wed, 22 Nov 2023 15:00:32 -0700 Subject: [PATCH 06/35] add logic --- .../providers/alicanto/alicanto_helper.py | 52 +++++++++++++++++-- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index 462f54ac..ed5d3c5e 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -224,31 +224,73 @@ def run(self): #Handle Logic if self.logic[end_dest_tag] is not None: expr = self.parser.parse(self.logic[end_dest_tag]) + ''' # Assign variables vars = {} for var in expr.variables(): vars[var] = self.tag(var) + ''' + i = 0 + # Assign vars not working, so assign token manually + for token in expr.tokens: + for search_tag in self.tags: + if token.toString() == search_tag: + expr.tokens[i].number_ = self.tag(token.toString()) + i += 1 # Evaluate expression value = expr.evaluate(vars) value = str(value).lower() - if value != self.tag(end_dest_tag): + if value != self.tag(full_end_dest): logger.debug(f"\tLOGIC: {end_dest_tag.strip()}={self.logic[end_dest_tag]} ----> {value}") # Assign new tag value - self._tag(end_dest_tag.strip(), value) + self._tag(full_end_dest, value) # Skip if value is unchanged - elif value == self.tag(end_dest_tag): + elif value == self.tag(full_end_dest): continue self.end_clients[end_dest] = TestClient(end_dest) - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) + if self.logic[end_dest_tag] is not None: + self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest)) + else: + self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) elif self.types[full_end_name] == 'bool': if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower(): + #Handle Logic + if self.logic[end_dest_tag] is not None: + expr = self.parser.parse(self.logic[end_dest_tag]) + ''' + # Assign variables + vars = {} + for var in expr.variables(): + vars[var] = self.tag(var) + ''' + i = 0 + # Assign vars not working, so assign token manually + for token in expr.tokens: + for search_tag in self.tags: + if token.toString() == search_tag: + expr.tokens[i].number_ = bool(self.tag(token.toString())) + i += 1 + # Evaluate expression + value = expr.evaluate(vars) + value = str(value) + if value != self.tag(full_end_dest): + logger.debug(f"\tLOGIC: {end_dest_tag.strip()}={self.logic[end_dest_tag]} ----> {value}") + # Assign new tag value + self._tag(full_end_dest, value) + # Skip if value is unchanged + elif value == self.tag(full_end_dest): + continue + self.end_clients[end_dest] = TestClient(end_dest) - self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) + if self.logic[end_dest_tag] is not None: + self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_dest)) + else: + self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) value = reply[1].rstrip('\x00') From 8f3214bed383eb84c94a0b06d16eb5fd986cf1d0 Mon Sep 17 00:00:00 2001 From: jarwils Date: Wed, 22 Nov 2023 15:54:23 -0700 Subject: [PATCH 07/35] fix tag --- .../providers/alicanto/alicanto_helper.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index ed5d3c5e..dae15e27 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -222,8 +222,8 @@ def run(self): if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double': if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))): #Handle Logic - if self.logic[end_dest_tag] is not None: - expr = self.parser.parse(self.logic[end_dest_tag]) + if self.logic[full_end_dest] is not None: + expr = self.parser.parse(self.logic[full_end_dest]) ''' # Assign variables vars = {} @@ -241,7 +241,7 @@ def run(self): value = expr.evaluate(vars) value = str(value).lower() if value != self.tag(full_end_dest): - logger.debug(f"\tLOGIC: {end_dest_tag.strip()}={self.logic[end_dest_tag]} ----> {value}") + logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") # Assign new tag value self._tag(full_end_dest, value) # Skip if value is unchanged @@ -249,7 +249,7 @@ def run(self): continue self.end_clients[end_dest] = TestClient(end_dest) - if self.logic[end_dest_tag] is not None: + if self.logic[full_end_dest] is not None: self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest)) else: self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) @@ -260,8 +260,8 @@ def run(self): elif self.types[full_end_name] == 'bool': if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower(): #Handle Logic - if self.logic[end_dest_tag] is not None: - expr = self.parser.parse(self.logic[end_dest_tag]) + if self.logic[full_end_dest] is not None: + expr = self.parser.parse(self.logic[full_end_dest]) ''' # Assign variables vars = {} @@ -279,7 +279,7 @@ def run(self): value = expr.evaluate(vars) value = str(value) if value != self.tag(full_end_dest): - logger.debug(f"\tLOGIC: {end_dest_tag.strip()}={self.logic[end_dest_tag]} ----> {value}") + logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") # Assign new tag value self._tag(full_end_dest, value) # Skip if value is unchanged @@ -287,7 +287,7 @@ def run(self): continue self.end_clients[end_dest] = TestClient(end_dest) - if self.logic[end_dest_tag] is not None: + if self.logic[full_end_dest] is not None: self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_dest)) else: self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) From fedb82c1a547fc51f74a4784833ecaf5ab69770b Mon Sep 17 00:00:00 2001 From: jarwils Date: Mon, 27 Nov 2023 09:53:29 -0700 Subject: [PATCH 08/35] see if threads can be cleaned --- src/pybennu/pybennu/providers/alicanto/alicanto_helper.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index dae15e27..27ea3d83 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -192,6 +192,8 @@ def run(self): #value = self.tag(end_name) self.end_clients[end_dest] = TestClient(end_dest) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + #Try to keep up with threads + self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.endid[i]["value"] = value self.tag(full_end_dest, value) @@ -255,6 +257,8 @@ def run(self): self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + #Try to help thread craziness + self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) elif self.types[full_end_name] == 'bool': @@ -293,6 +297,8 @@ def run(self): self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + #Try to help thread craziness + self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) From 0e3b886bc6d11a0ae9cc571f66597e6bd5c83f63 Mon Sep 17 00:00:00 2001 From: jarwils Date: Mon, 27 Nov 2023 12:02:48 -0700 Subject: [PATCH 09/35] close sockets --- .../pybennu/providers/alicanto/alicanto_helper.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index 27ea3d83..bb94cca1 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -159,7 +159,8 @@ def __init__(self, config, exit_handler=None): for end_dest in self.end_dests: # Initialize bennu Client end_dest = end_dest.split('/')[0] - self.end_clients[end_dest] = TestClient(end_dest) + #self.end_clients[end_dest] = TestClient(end_dest) + self.end_clients[end_dest] = None logger.debug(f"\tEnd_clients: {self.end_clients}") def run(self): @@ -193,6 +194,7 @@ def run(self): self.end_clients[end_dest] = TestClient(end_dest) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to keep up with threads + self.end_clients[end_dest].__socket.close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.endid[i]["value"] = value @@ -258,6 +260,7 @@ def run(self): time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness + self.end_clients[end_dest].__socket.close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) @@ -298,6 +301,7 @@ def run(self): time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness + self.end_clients[end_dest].__socket.close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) @@ -310,13 +314,6 @@ def print_state(self): logger.debug(f"{tag:<30} --- {self.tag(tag):}") logger.debug("============================================") - def action_post_request_time(self): - pass - - def send_msg_to_endpoint(self, name, value): - endpoint = self.fed.get_endpoint_by_name(name) - endpoint.send_data(value.encode(), endpoint.default_destination) - def get_type(self, tag): return self.types[tag] From 1d9aa6ded7495e01b8774b479922d9edb6bfc987 Mon Sep 17 00:00:00 2001 From: jarwils Date: Mon, 27 Nov 2023 14:01:17 -0700 Subject: [PATCH 10/35] looks like context term did the trick --- .../pybennu/providers/alicanto/alicanto_helper.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index bb94cca1..7d83bfed 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -51,6 +51,10 @@ def send(self, message): return reply + def close(self): + self._Client__socket.close() + self._Client__context.term() + class alicantoFederate(): def __init__(self, config, exit_handler=None): exit = exit_handler if exit_handler else self.ctrl_exit_handler @@ -194,7 +198,7 @@ def run(self): self.end_clients[end_dest] = TestClient(end_dest) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to keep up with threads - self.end_clients[end_dest].__socket.close() + self.end_clients[end_dest].close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.endid[i]["value"] = value @@ -260,7 +264,7 @@ def run(self): time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness - self.end_clients[end_dest].__socket.close() + self.end_clients[end_dest].close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) @@ -301,7 +305,7 @@ def run(self): time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness - self.end_clients[end_dest].__socket.close() + self.end_clients[end_dest].close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) From 67af096510f3e40e5dc00e36816cc551c97c3651 Mon Sep 17 00:00:00 2001 From: jarwils Date: Mon, 27 Nov 2023 14:11:27 -0700 Subject: [PATCH 11/35] wait a sec --- src/pybennu/pybennu/providers/alicanto/alicanto_helper.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index 7d83bfed..c0fd3416 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -198,6 +198,7 @@ def run(self): self.end_clients[end_dest] = TestClient(end_dest) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to keep up with threads + sleep(1) self.end_clients[end_dest].close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') @@ -264,6 +265,7 @@ def run(self): time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness + sleep(1) self.end_clients[end_dest].close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') @@ -305,6 +307,7 @@ def run(self): time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness + sleep(1) self.end_clients[end_dest].close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') From aa9460bc9b0a29066a38c5739a92421d36bf012a Mon Sep 17 00:00:00 2001 From: jarwils Date: Mon, 27 Nov 2023 14:56:24 -0700 Subject: [PATCH 12/35] typo fix --- src/pybennu/pybennu/providers/alicanto/alicanto_helper.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index c0fd3416..e6f48be8 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -198,7 +198,7 @@ def run(self): self.end_clients[end_dest] = TestClient(end_dest) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to keep up with threads - sleep(1) + time.sleep(1) self.end_clients[end_dest].close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') @@ -265,7 +265,7 @@ def run(self): time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness - sleep(1) + time.sleep(1) self.end_clients[end_dest].close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') @@ -307,7 +307,7 @@ def run(self): time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness - sleep(1) + time.sleep(1) self.end_clients[end_dest].close() self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') From 0fbfec256f0379d00f4277a6944df0c54165f2b9 Mon Sep 17 00:00:00 2001 From: jarwils Date: Tue, 28 Nov 2023 10:25:26 -0700 Subject: [PATCH 13/35] working on closing zmq context --- src/pybennu/pybennu/providers/alicanto/alicanto_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index e6f48be8..fc8d9dc0 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -52,7 +52,7 @@ def send(self, message): return reply def close(self): - self._Client__socket.close() + #self._Client__socket.close() self._Client__context.term() class alicantoFederate(): From c55a654724b4b40c49b9e8ca7c481c7ed689bb50 Mon Sep 17 00:00:00 2001 From: jarwils Date: Tue, 28 Nov 2023 15:44:56 -0700 Subject: [PATCH 14/35] maybe working --- .../pybennu/providers/alicanto/alicanto_helper.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index fc8d9dc0..7bf13326 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -7,6 +7,7 @@ import time import threading import math +import zmq from distutils.util import strtobool from py_expression_eval import Parser @@ -35,6 +36,7 @@ def __init__(self, end_dest): def send(self, message): """ Send message to Provider """ + self.connect() # send update self._Client__socket.send_string(message+'\0') # must include null byte # get response @@ -49,12 +51,11 @@ def send(self, message): else: print("I: ERR -- %s" % msg) - return reply - - def close(self): - #self._Client__socket.close() + self._Client__socket.close() self._Client__context.term() + return reply + class alicantoFederate(): def __init__(self, config, exit_handler=None): exit = exit_handler if exit_handler else self.ctrl_exit_handler @@ -199,8 +200,6 @@ def run(self): reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to keep up with threads time.sleep(1) - self.end_clients[end_dest].close() - self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.endid[i]["value"] = value self.tag(full_end_dest, value) @@ -266,8 +265,6 @@ def run(self): reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness time.sleep(1) - self.end_clients[end_dest].close() - self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) elif self.types[full_end_name] == 'bool': @@ -308,8 +305,6 @@ def run(self): reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness time.sleep(1) - self.end_clients[end_dest].close() - self.end_clients[end_dest] = None value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) From 275c264855d0b8df9f065eb631b453b88eef08ea Mon Sep 17 00:00:00 2001 From: jarwils Date: Wed, 29 Nov 2023 15:26:39 -0700 Subject: [PATCH 15/35] add some error handling --- .../providers/alicanto/alicanto_helper.py | 114 ++++++++++++------ 1 file changed, 77 insertions(+), 37 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index 7bf13326..1e8de1f4 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -19,6 +19,31 @@ logger = logging.getLogger('alicanto') logger.setLevel(logging.INFO) +#Try adding a timeout helper +import signal +from contextlib import contextmanager + + +@contextmanager +def timeout(time): + # Register a function to raise a TimeoutError on the signal. + signal.signal(signal.SIGALRM, raise_timeout) + # Schedule the signal to be sent after ``time`` + signal.alarm(time) + + try: + yield + except TimeoutError: + pass + finally: + # Unregister the signal so it won't be triggered + # if the timeout is not reached. + signal.signal(signal.SIGALRM, signal.SIG_IGN) + + +def raise_timeout(signum, frame): + raise TimeoutError + # TestSubscriber used for easy multiple subscribers class TestSubscriber(Subscriber): def __init__(self, sub_source): @@ -36,25 +61,26 @@ def __init__(self, end_dest): def send(self, message): """ Send message to Provider """ - self.connect() - # send update - self._Client__socket.send_string(message+'\0') # must include null byte - # get response - msg = self._Client__socket.recv_string() - reply = msg.split('=') - status = reply[0] - data = reply[1] - - if status == self._Client__kACK: - print("I: ACK: "+data) - #self.reply_handler(data) - else: - print("I: ERR -- %s" % msg) + with timeout(10): + self.connect() + # send update + self._Client__socket.send_string(message+'\0') # must include null byte + # get response + msg = self._Client__socket.recv_string() + reply = msg.split('=') + status = reply[0] + data = reply[1] + + if status == self._Client__kACK: + print("I: ACK: "+data) + #self.reply_handler(data) + else: + print("I: ERR -- %s" % msg) - self._Client__socket.close() - self._Client__context.term() + self._Client__socket.close() + self._Client__context.term() - return reply + return reply class alicantoFederate(): def __init__(self, config, exit_handler=None): @@ -164,8 +190,11 @@ def __init__(self, config, exit_handler=None): for end_dest in self.end_dests: # Initialize bennu Client end_dest = end_dest.split('/')[0] - #self.end_clients[end_dest] = TestClient(end_dest) - self.end_clients[end_dest] = None + try: + self.end_clients[end_dest] = TestClient(end_dest) + except: + logger.error(f"\tError Initializing Client: {self.end_clients}") + #self.end_clients[end_dest] = None logger.debug(f"\tEnd_clients: {self.end_clients}") def run(self): @@ -196,8 +225,12 @@ def run(self): if '/' in full_end_dest else full_end_dest) #value = self.tag(end_name) - self.end_clients[end_dest] = TestClient(end_dest) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + try: + self.end_clients[end_dest] = TestClient(end_dest) + reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + except: + logger.error(f"\tError Initializing Client: {self.end_clients}") + continue #Try to keep up with threads time.sleep(1) value = reply[1].rstrip('\x00') @@ -255,14 +288,18 @@ def run(self): # Skip if value is unchanged elif value == self.tag(full_end_dest): continue - - self.end_clients[end_dest] = TestClient(end_dest) - if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest)) - else: - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) - time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + + try: + self.end_clients[end_dest] = TestClient(end_dest) + if self.logic[full_end_dest] is not None: + self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest)) + else: + self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) + time.sleep(0.5) + reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + except: + logger.error(f"\tError Initializing Client: {self.end_clients}") + continue #Try to help thread craziness time.sleep(1) value = reply[1].rstrip('\x00') @@ -295,14 +332,17 @@ def run(self): # Skip if value is unchanged elif value == self.tag(full_end_dest): continue - - self.end_clients[end_dest] = TestClient(end_dest) - if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_dest)) - else: - self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) - time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + try: + self.end_clients[end_dest] = TestClient(end_dest) + if self.logic[full_end_dest] is not None: + self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_dest)) + else: + self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) + time.sleep(0.5) + reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + except: + logger.error(f"\tError Initializing Client: {self.end_clients}") + continue #Try to help thread craziness time.sleep(1) value = reply[1].rstrip('\x00') From b81b757b64cc4f8828f3b1d258b8e7b531573927 Mon Sep 17 00:00:00 2001 From: jarwils Date: Wed, 29 Nov 2023 16:34:34 -0700 Subject: [PATCH 16/35] add a sleep --- src/pybennu/pybennu/providers/alicanto/alicanto_helper.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index 1e8de1f4..4b4de820 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -243,6 +243,8 @@ def run(self): ########## Main co-simulation loop #################################### # As long as granted time is in the time range to be simulated... + #Wait for other sims to start + time.sleep(60) while True: self.print_state() time.sleep(1) From 44f882c027d077b458143d45c598f30f35bcb97c Mon Sep 17 00:00:00 2001 From: jarwils Date: Wed, 29 Nov 2023 18:07:00 -0700 Subject: [PATCH 17/35] move split into try --- .../providers/alicanto/alicanto_helper.py | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index 4b4de820..2ea66ef1 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -228,22 +228,22 @@ def run(self): try: self.end_clients[end_dest] = TestClient(end_dest) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + #Try to keep up with threads + time.sleep(1) + value = reply[1].rstrip('\x00') + self.endid[i]["value"] = value + self.tag(full_end_dest, value) + logger.debug(f"Initial Endpoints {end_name} / {end_dest}:{value} ") + except: logger.error(f"\tError Initializing Client: {self.end_clients}") continue - #Try to keep up with threads - time.sleep(1) - value = reply[1].rstrip('\x00') - self.endid[i]["value"] = value - self.tag(full_end_dest, value) - logger.debug(f"Initial Endpoints {end_name} / {end_dest}:{value} ") - ########## Main co-simulation loop #################################### # As long as granted time is in the time range to be simulated... - #Wait for other sims to start + #Wait for other sims to time.sleep(60) while True: self.print_state() @@ -299,13 +299,13 @@ def run(self): self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + #Try to help thread craziness + time.sleep(1) + value = reply[1].rstrip('\x00') + self.tag(full_end_dest, value) except: logger.error(f"\tError Initializing Client: {self.end_clients}") continue - #Try to help thread craziness - time.sleep(1) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) elif self.types[full_end_name] == 'bool': if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower(): #Handle Logic @@ -342,13 +342,14 @@ def run(self): self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + + #Try to help thread craziness + time.sleep(1) + value = reply[1].rstrip('\x00') + self.tag(full_end_dest, value) except: logger.error(f"\tError Initializing Client: {self.end_clients}") continue - #Try to help thread craziness - time.sleep(1) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) From 6c37b0bc9e13982eefab4872fd8545b27ac58c76 Mon Sep 17 00:00:00 2001 From: jarwils Date: Thu, 30 Nov 2023 08:08:53 -0700 Subject: [PATCH 18/35] get rid of sleeps --- src/pybennu/pybennu/providers/alicanto/alicanto_helper.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py index 2ea66ef1..5ab721a5 100644 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py @@ -229,7 +229,7 @@ def run(self): self.end_clients[end_dest] = TestClient(end_dest) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to keep up with threads - time.sleep(1) + #time.sleep(1) value = reply[1].rstrip('\x00') self.endid[i]["value"] = value self.tag(full_end_dest, value) @@ -247,7 +247,7 @@ def run(self): time.sleep(60) while True: self.print_state() - time.sleep(1) + time.sleep(0.1) for i in range(self.end_count): full_end_name = self.endid[i]["name"] end_name = (full_end_name.split('/')[1] @@ -300,7 +300,7 @@ def run(self): time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness - time.sleep(1) + #time.sleep(1) value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) except: @@ -344,7 +344,7 @@ def run(self): reply = self.end_clients[end_dest].send("READ="+end_dest_tag) #Try to help thread craziness - time.sleep(1) + #time.sleep(1) value = reply[1].rstrip('\x00') self.tag(full_end_dest, value) except: From 9d7b297ce452b1cdc69dd4d7693ec6ecdddc422d Mon Sep 17 00:00:00 2001 From: jarwils Date: Thu, 30 Nov 2023 08:57:41 -0700 Subject: [PATCH 19/35] work on getting seperate exec good --- .../pybennu/executables/pybennu_alicanto.py | 204 +++++++++++++++--- 1 file changed, 170 insertions(+), 34 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 43682ec5..5da00198 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -19,6 +19,31 @@ from pybennu.distributed.client import Client import pybennu.distributed.swig._Endpoint as E +#Adding a timeout helper to cause client objects not to feeze program +import signal +from contextlib import contextmanager + + +@contextmanager +def timeout(time): + # Register a function to raise a TimeoutError on the signal. + signal.signal(signal.SIGALRM, raise_timeout) + # Schedule the signal to be sent after ``time`` + signal.alarm(time) + + try: + yield + except TimeoutError: + pass + finally: + # Unregister the signal so it won't be triggered + # if the timeout is not reached. + signal.signal(signal.SIGALRM, signal.SIG_IGN) + + +def raise_timeout(signum, frame): + raise TimeoutError + logging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger('alicanto') #logger.addHandler(logging.StreamHandler()) @@ -34,24 +59,31 @@ class alicantoClient(Client): def __init__(self, end_dest): new_endpoint_dest = E.new_Endpoint() E.Endpoint_str_set(new_endpoint_dest, 'tcp://'+str(end_dest)) - self.endpointName = 'tcp://'+str(end_dest) Client.__init__(self, new_endpoint_dest) - + def send(self, message): """ Send message to Provider """ - # send update - self._Client__socket.send_string(message+'\0') # must include null byte - # get response - msg = self._Client__socket.recv_string() - reply = msg.split('=') - status = reply[0] - data = reply[1] + with timeout(10): + self.connect() + # send update + self._Client__socket.send_string(message+'\0') # must include null byte + # get response + msg = self._Client__socket.recv_string() + reply = msg.split('=') + status = reply[0] + data = reply[1] + + if status == self._Client__kACK: + print("I: ACK: "+data) + #self.reply_handler(data) + else: + print("I: ERR -- %s" % msg) - if status != self._Client__kACK: - logger.error(msg) + self._Client__socket.close() + self._Client__context.term() - return reply + return reply class alicanto(): def __init__(self, config, debug=False, exit_handler=None): @@ -103,7 +135,7 @@ def __init__(self, config, debug=False, exit_handler=None): end_name = endpoint["name"] end_destination = endpoint["destination"] end_type = endpoint["type"] - logger.debug(f"Registered endpoint ---> end_name: {end_name} ---> end_destination: {end_destination}") + logger.info(f"Registered endpoint ---> end_name: {end_name} ---> end_destination: {end_destination}") self.tags.update({end_destination : 0}) self.end_dests.append(end_destination) self.dests[end_name] = end_destination @@ -124,16 +156,15 @@ def __init__(self, config, debug=False, exit_handler=None): sub_info = self.subid[i]["info"] # stores logic for interdependencies except: sub_info = None - logger.debug(f"Registered subscription ---> sub_name: {sub_name} ---> sub_type: {sub_type} ---> sub_info: {sub_info}") - #sub_name = sub_name.split('/')[1] if '/' in sub_name else sub_name + logger.info(f"Registered subscription ---> sub_name: {sub_name} ---> sub_type: {sub_type} ---> sub_info: {sub_info}") self.tags.update({sub_name : 0 }) self.types[sub_name] = sub_type if sub_info: - logger.debug(f"********** LOGIC **********") + logger.info(f"********** LOGIC **********") for exp in sub_info.split(';'): lhs, rhs = exp.split('=') self.logic[lhs.strip()] = rhs.strip() - logger.debug(f'{exp.strip()}') + logger.info(f'{exp.strip()}') #make sub_sources elements unique self.sub_sources = list(set(self.sub_sources)) @@ -141,7 +172,7 @@ def __init__(self, config, debug=False, exit_handler=None): self.state[tag] = False if self.get_type(tag) == 'bool' else 0 for sub_source in self.sub_sources: - logger.debug(f"Launching Subscriber Thread ---> subscription: udp://{sub_source}") + logger.info(f"Launching Subscriber Thread ---> subscription: udp://{sub_source}") subber = alicantoSubscriber(sub_source) subber.subscription_handler = self._subscription_handler self.__sub_thread = threading.Thread(target=subber.run) @@ -153,17 +184,51 @@ def __init__(self, config, debug=False, exit_handler=None): for end_dest in self.end_dests: # Initialize bennu Client end_dest = end_dest.split('/')[0] - self.end_clients[end_dest] = alicantoClient(end_dest) + try: + self.end_clients[end_dest] = alicantoClient(end_dest) + except: + logger.error(f"\tError Initializing Client: {self.end_clients}") for key in list(self.end_clients.keys()): - logger.debug(f"End_client: {key}") + logger.info(f"End_client: {key}") def run(self): ############## Entering Execution Mode ############################## logger.info("Entered alicanto execution mode") + # Endpoint initial values to alicanto + for i in range(self.end_count): + full_end_name = self.endid[i]["name"] + end_name = (full_end_name.split('/')[1] + if '/' in full_end_name + else full_end_name) + full_end_dest = self.endid[i]["destination"] + end_dest = (full_end_dest.split('/')[0] + if '/' in full_end_dest + else full_end_dest) + end_dest_tag = (full_end_dest.split('/')[1] + if '/' in full_end_dest + else full_end_dest) + #value = self.tag(end_name) + try: + self.end_clients[end_dest] = alicantoClient(end_dest) + reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + #Try to keep up with threads + #time.sleep(1) + value = reply[1].rstrip('\x00') + self.endid[i]["value"] = value + self.tag(full_end_dest, value) + logger.debug(f"Initial Endpoints {end_name} / {end_dest}:{value} ") + + except: + logger.error(f"\tError Initializing Client: {self.end_clients}") + continue + + # Wait for other sims to start + time.sleep(60) ########## Main co-simulation loop #################################### while True: self.publish_state() + time.sleep(0.1) for key, value in self.endid.items(): full_end_name = value["name"] end_name = (full_end_name.split('/')[1] @@ -180,22 +245,93 @@ def run(self): # !!need to add something to handle binary points if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double': if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))): - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) - reply = self.end_clients[end_dest].send("READ="+end_name) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) + #Handle Logic + if self.logic[full_end_dest] is not None: + expr = self.parser.parse(self.logic[full_end_dest]) + ''' + # Assign variables + vars = {} + for var in expr.variables(): + vars[var] = self.tag(var) + ''' + i = 0 + # Assign vars not working, so assign token manually + for token in expr.tokens: + for search_tag in self.tags: + if token.toString() == search_tag: + expr.tokens[i].number_ = self.tag(token.toString()) + i += 1 + # Evaluate expression + value = expr.evaluate(vars) + value = str(value).lower() + if value != self.tag(full_end_dest): + logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") + # Assign new tag value + self._tag(full_end_dest, value) + # Skip if value is unchanged + elif value == self.tag(full_end_dest): + continue + + try: + self.end_clients[end_dest] = alicantoClient(end_dest) + if self.logic[full_end_dest] is not None: + self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest)) + else: + self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) + time.sleep(0.5) + reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + value = reply[1].rstrip('\x00') + self.tag(full_end_dest, value) + except: + logger.error(f"\tError Initializing Client: {self.end_clients}") + continue elif self.types[full_end_name] == 'bool': if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower(): - self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) - reply = self.end_clients[end_dest].send("READ="+end_name) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) + #Handle Logic + if self.logic[full_end_dest] is not None: + expr = self.parser.parse(self.logic[full_end_dest]) + ''' + # Assign variables + vars = {} + for var in expr.variables(): + vars[var] = self.tag(var) + ''' + i = 0 + # Assign vars not working, so assign token manually + for token in expr.tokens: + for search_tag in self.tags: + if token.toString() == search_tag: + expr.tokens[i].number_ = bool(self.tag(token.toString())) + i += 1 + # Evaluate expression + value = expr.evaluate(vars) + value = str(value) + if value != self.tag(full_end_dest): + logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") + # Assign new tag value + self._tag(full_end_dest, value) + # Skip if value is unchanged + elif value == self.tag(full_end_dest): + continue + try: + self.end_clients[end_dest] = alicantoClient(end_dest) + if self.logic[full_end_dest] is not None: + self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_dest)) + else: + self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) + time.sleep(0.5) + reply = self.end_clients[end_dest].send("READ="+end_dest_tag) + value = reply[1].rstrip('\x00') + self.tag(full_end_dest, value) + except: + logger.error(f"\tError Initializing Client: {self.end_clients}") + continue def publish_state(self): - logger.debug("=================== DATA ===================") + logger.info("=================== DATA ===================") for tag in self.tags: - logger.debug(f"{tag:<30} --- {self.tag(tag):}") - logger.debug("============================================") + logger.info(f"{tag:<30} --- {self.tag(tag):}") + logger.info("============================================") def get_type(self, tag): return self.types[tag] @@ -249,12 +385,12 @@ def _subscription_handler(self, message): if field == 'value': if not math.isclose(float(self.tag(full_tag)), value): self.tag(full_tag, value) - logger.info("UPDATE NOW: "+full_tag) - logger.info("New value: "+str(value)) + logger.debug("UPDATE NOW: "+full_tag) + logger.debug("New value: "+str(value)) else: continue elif field == 'status': - logger.info("Cannot handle binary points") + logger.debug("Cannot handle binary points") continue else: continue From 8f49f6f0bc3bd5d4dee6c1d91269a51503107cf4 Mon Sep 17 00:00:00 2001 From: jarwils Date: Thu, 30 Nov 2023 09:32:49 -0700 Subject: [PATCH 20/35] fix logic import --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 5da00198..2b3ac056 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -104,6 +104,9 @@ def __init__(self, config, debug=False, exit_handler=None): self.dests = {} # Tag=>type map self.types = {} + self.logic = {} + # Expression parser for logic + self.parser = Parser() # Set of all tags self.tags = {} From ce37292ca946988af7d01d17e6581bf3777b2ccd Mon Sep 17 00:00:00 2001 From: jarwils Date: Thu, 30 Nov 2023 09:45:02 -0700 Subject: [PATCH 21/35] add parser --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 2b3ac056..2f83496f 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -14,6 +14,8 @@ import sys import time import math +from distutils.util import strtobool +from py_expression_eval import Parser from pybennu.distributed.subscriber import Subscriber from pybennu.distributed.client import Client From e371c1052a6baffc514d31d1101c9b0a35522fd4 Mon Sep 17 00:00:00 2001 From: jarwils Date: Thu, 30 Nov 2023 10:55:41 -0700 Subject: [PATCH 22/35] change tag --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 2f83496f..66d61bed 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -272,7 +272,7 @@ def run(self): if value != self.tag(full_end_dest): logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") # Assign new tag value - self._tag(full_end_dest, value) + self.tag(full_end_dest, value) # Skip if value is unchanged elif value == self.tag(full_end_dest): continue @@ -314,7 +314,7 @@ def run(self): if value != self.tag(full_end_dest): logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") # Assign new tag value - self._tag(full_end_dest, value) + self.tag(full_end_dest, value) # Skip if value is unchanged elif value == self.tag(full_end_dest): continue From 8dad9ee7cdd491e5463176ef914d3dcfc01a7c75 Mon Sep 17 00:00:00 2001 From: jarwils Date: Thu, 30 Nov 2023 15:15:36 -0700 Subject: [PATCH 23/35] maybe a fix --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 66d61bed..18da98af 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -362,6 +362,7 @@ def _subscription_handler(self, message): message (str): published zmq message as a string """ points = message.split(',') + points = points[:-1] # remove last element since it might be empty sub_source = threading.current_thread().name for point in points: From 0c023d5802ce1343199c08aa5f221a0f904ca9f9 Mon Sep 17 00:00:00 2001 From: jarwils Date: Wed, 20 Dec 2023 10:20:25 -0700 Subject: [PATCH 24/35] fix point split not working at start --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 18da98af..8cb6055c 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -371,9 +371,12 @@ def _subscription_handler(self, message): if point == "": continue - tag = point.split(':')[0] - full_tag = sub_source + '/' + tag - value = point.split(':')[1] + try: + tag = point.split(':')[0] + full_tag = sub_source + '/' + tag + value = point.split(':')[1] + except: + continue if full_tag not in self.tags: continue From 157b97b9f24a23b02607f9198b9ef0c79face966 Mon Sep 17 00:00:00 2001 From: jarwils Date: Tue, 16 Jan 2024 14:08:52 -0700 Subject: [PATCH 25/35] fix binary bug --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 8cb6055c..27731187 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -399,8 +399,12 @@ def _subscription_handler(self, message): else: continue elif field == 'status': - logger.debug("Cannot handle binary points") - continue + if self.tag(full_tag) != value: + self.tag(full_tag, value) + logger.debug("UPDATE NOW: "+full_tag) + logger.debug("New value: "+str(value)) + else: + continue else: continue From aaf711a0f553a03b40d3be7b9909b520a627baf5 Mon Sep 17 00:00:00 2001 From: jarwils Date: Tue, 16 Jan 2024 16:45:35 -0700 Subject: [PATCH 26/35] fix client.py weirdness --- src/pybennu/pybennu/distributed/client.py | 5 ++++- src/pybennu/pybennu/executables/pybennu_alicanto.py | 13 +++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/pybennu/pybennu/distributed/client.py b/src/pybennu/pybennu/distributed/client.py index 3b586b47..938c98e0 100644 --- a/src/pybennu/pybennu/distributed/client.py +++ b/src/pybennu/pybennu/distributed/client.py @@ -73,6 +73,9 @@ def write_digital_point(self, tag, value): tag: String name of tag to update. value: Value that the tag will be updated to. """ - val = "true" if value else "false" + if value.lower() == "false" or 0: + val = "false" + else: + val = "true" update = "WRITE=" + tag + ":" + val self.send(update) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 27731187..04acba45 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -381,12 +381,13 @@ def _subscription_handler(self, message): if full_tag not in self.tags: continue - if value.lower() == 'false': - value = False - field = 'status' - elif value.lower() == 'true': - value = True - field = 'status' + if self.types[full_tag] == 'bool': + if value.lower() == 'false' or value == '0': + value = False + field = 'status' + elif value.lower() == 'true' or value == '1': + value = True + field = 'status' else: value = float(value) field = 'value' From d0de9d89ab731ded5fb17d363858cda61ab187bc Mon Sep 17 00:00:00 2001 From: jarwils Date: Tue, 16 Jan 2024 16:46:27 -0700 Subject: [PATCH 27/35] allow user to provide false to client.py --- src/pybennu/pybennu/distributed/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pybennu/pybennu/distributed/client.py b/src/pybennu/pybennu/distributed/client.py index 938c98e0..9042d02d 100644 --- a/src/pybennu/pybennu/distributed/client.py +++ b/src/pybennu/pybennu/distributed/client.py @@ -73,7 +73,7 @@ def write_digital_point(self, tag, value): tag: String name of tag to update. value: Value that the tag will be updated to. """ - if value.lower() == "false" or 0: + if value.lower() == "false" or value == "0": val = "false" else: val = "true" From 82fa4c6b8a677444a1f981e3822946a42f67dfa7 Mon Sep 17 00:00:00 2001 From: jarwils Date: Wed, 17 Jan 2024 09:57:31 -0700 Subject: [PATCH 28/35] comment out startup sleep --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 04acba45..84319fc0 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -227,8 +227,8 @@ def run(self): logger.error(f"\tError Initializing Client: {self.end_clients}") continue - # Wait for other sims to start - time.sleep(60) + # Wait for other sims to start, not sure if needed + #time.sleep(5) ########## Main co-simulation loop #################################### while True: From a2ff1bb3c19d7b9f4fef438bf9dfd44b6672efbd Mon Sep 17 00:00:00 2001 From: jarwils Date: Fri, 19 Jan 2024 15:35:39 -0700 Subject: [PATCH 29/35] undo client change --- src/pybennu/pybennu/distributed/client.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/pybennu/pybennu/distributed/client.py b/src/pybennu/pybennu/distributed/client.py index 9042d02d..3b586b47 100644 --- a/src/pybennu/pybennu/distributed/client.py +++ b/src/pybennu/pybennu/distributed/client.py @@ -73,9 +73,6 @@ def write_digital_point(self, tag, value): tag: String name of tag to update. value: Value that the tag will be updated to. """ - if value.lower() == "false" or value == "0": - val = "false" - else: - val = "true" + val = "true" if value else "false" update = "WRITE=" + tag + ":" + val self.send(update) From 9424fec29228341fe0fff6b04452736a74bbedcb Mon Sep 17 00:00:00 2001 From: jarwils Date: Fri, 19 Jan 2024 15:38:53 -0700 Subject: [PATCH 30/35] try out new digiital write --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 84319fc0..7e1848d2 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -321,9 +321,9 @@ def run(self): try: self.end_clients[end_dest] = alicantoClient(end_dest) if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_dest)) + self.end_clients[end_dest].write_digital_point(end_dest_tag, bool(self.tag(full_end_dest))) else: - self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) + self.end_clients[end_dest].write_digital_point(end_dest_tag, bool(self.tag(full_end_name))) time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) value = reply[1].rstrip('\x00') From b8f063a38e691f9cbcebf36c728de0db57650b95 Mon Sep 17 00:00:00 2001 From: jarwils Date: Fri, 19 Jan 2024 16:04:10 -0700 Subject: [PATCH 31/35] actual fix with eval --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 7e1848d2..83fa71d1 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -321,9 +321,9 @@ def run(self): try: self.end_clients[end_dest] = alicantoClient(end_dest) if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_digital_point(end_dest_tag, bool(self.tag(full_end_dest))) + self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_dest))) else: - self.end_clients[end_dest].write_digital_point(end_dest_tag, bool(self.tag(full_end_name))) + self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_name))) time.sleep(0.5) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) value = reply[1].rstrip('\x00') From a4cc80722067b11681b2622f5023086eb80d14ce Mon Sep 17 00:00:00 2001 From: jarwils Date: Tue, 23 Jan 2024 08:47:57 -0700 Subject: [PATCH 32/35] remove unneeded provider files --- .../pybennu/providers/alicanto/README.md | 5 +- .../pybennu/providers/alicanto/__init__.py | 1 - .../pybennu/providers/alicanto/alicanto.py | 70 --- .../providers/alicanto/alicanto_helper.py | 450 ------------------ .../pybennu/providers/power/power_daemon.py | 9 - 5 files changed, 2 insertions(+), 533 deletions(-) delete mode 100644 src/pybennu/pybennu/providers/alicanto/__init__.py delete mode 100644 src/pybennu/pybennu/providers/alicanto/alicanto.py delete mode 100644 src/pybennu/pybennu/providers/alicanto/alicanto_helper.py diff --git a/src/pybennu/pybennu/providers/alicanto/README.md b/src/pybennu/pybennu/providers/alicanto/README.md index 2a9e2647..31931fa1 100644 --- a/src/pybennu/pybennu/providers/alicanto/README.md +++ b/src/pybennu/pybennu/providers/alicanto/README.md @@ -1,12 +1,11 @@ Alicanto is a new feature made to be a more simple co-simulation tool than HELICS. The code is similar to the bennu HELICS code but stripped down. -Alicanto runs as a Subscriber and Client object. It takes in a configuration file (which points to a json) which defines which points Alicanto cares about -JSON format +Alicanto runs as a Subscriber and Client object. It takes in a json file which defines which points Alicanto cares about. - Subscriptions tell Alicanto which publish point (udp) to subscrie to and which point to keep track of - Endpoints tell Alicanto where to corelate that subscribed point to a server-endpoint Usage: -`pybennu-power-solver -c config.ini -v start` +`pybennu-power-solver -c alicanto.json -d DEBUG` Please update this README as Alicanto is used more \ No newline at end of file diff --git a/src/pybennu/pybennu/providers/alicanto/__init__.py b/src/pybennu/pybennu/providers/alicanto/__init__.py deleted file mode 100644 index 8b137891..00000000 --- a/src/pybennu/pybennu/providers/alicanto/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto.py b/src/pybennu/pybennu/providers/alicanto/alicanto.py deleted file mode 100644 index df6b57d2..00000000 --- a/src/pybennu/pybennu/providers/alicanto/alicanto.py +++ /dev/null @@ -1,70 +0,0 @@ -"""bennu provider / alicanto handler - -Provides an interface between bennu field devices and a alicanto federation. -""" -import logging -import threading - -from distutils.util import strtobool - -from pybennu.distributed.client import Client -import pybennu.distributed.swig._Endpoint as E -from pybennu.providers.alicanto.alicanto_helper import alicantoFederate - -logger = logging.getLogger('alicanto') -logger.addHandler(logging.StreamHandler()) -logger.setLevel(logging.INFO) - - -class Alicanto(alicantoFederate): - """ bennu provider / alicanto federate """ - - def __init__(self, server_endpoint, publish_endpoint, config, debug=False): - self.__lock = threading.Lock() - if debug: - logger.setLevel(logging.DEBUG) - - - - # Initialize alicantoFederate and start run() method in a separate thread - ## Need to write a way for alicanto to publish updates - alicantoFederate.__init__(self, config) - self.__publish_thread = threading.Thread(target=alicantoFederate.run, - args=(self,)) - self.__publish_thread.daemon = True - self.__publish_thread.start() - - # Initialize system state - self.state = {} - for tag in self.tags: - self.state[tag] = False if self.get_type(tag) == 'bool' else 0 - - - def tag(self, tag, value=None): - with self.__lock: - if value is not None: - self.state[tag] = value - else: - if tag in self.state: - return self.state[tag] - else: - return False if self.get_type(tag) == 'bool' else 0 - - - # CLIENT write helper - def write(self, tag, value): - with self.__lock: - print("### PROCESSING UPDATE ###") - try: - for tag in self.tags: - print(f"### TAG: {tag} ###") - if tag not in self.state: - raise NameError(f"{tag} is not a known tag name") - value = self.tags[tag] - self.state[tag] = value - self.send_msg_to_endpoint(tag, value) - print(f"### UPDATED: {tag}:{value} ###") - except Exception as err: - print(f"ERROR: failed to process update: {err}") - return f'ERR={err}' - return 'ACK=Success processing alicanto write command' diff --git a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py b/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py deleted file mode 100644 index 5ab721a5..00000000 --- a/src/pybennu/pybennu/providers/alicanto/alicanto_helper.py +++ /dev/null @@ -1,450 +0,0 @@ -"""alicanto helper class -""" -import json -import logging -import signal -import sys -import time -import threading -import math -import zmq - -from distutils.util import strtobool -from py_expression_eval import Parser - -from pybennu.distributed.subscriber import Subscriber -from pybennu.distributed.client import Client -import pybennu.distributed.swig._Endpoint as E - -logger = logging.getLogger('alicanto') -logger.setLevel(logging.INFO) - -#Try adding a timeout helper -import signal -from contextlib import contextmanager - - -@contextmanager -def timeout(time): - # Register a function to raise a TimeoutError on the signal. - signal.signal(signal.SIGALRM, raise_timeout) - # Schedule the signal to be sent after ``time`` - signal.alarm(time) - - try: - yield - except TimeoutError: - pass - finally: - # Unregister the signal so it won't be triggered - # if the timeout is not reached. - signal.signal(signal.SIGALRM, signal.SIG_IGN) - - -def raise_timeout(signum, frame): - raise TimeoutError - -# TestSubscriber used for easy multiple subscribers -class TestSubscriber(Subscriber): - def __init__(self, sub_source): - new_publish_endpoint = E.new_Endpoint() - E.Endpoint_str_set(new_publish_endpoint, 'udp://'+str(sub_source)) - Subscriber.__init__(self, new_publish_endpoint) - -#TestClient used with special send function which returns the reply -class TestClient(Client): - def __init__(self, end_dest): - new_endpoint_dest = E.new_Endpoint() - E.Endpoint_str_set(new_endpoint_dest, 'tcp://'+str(end_dest)) - Client.__init__(self, new_endpoint_dest) - - def send(self, message): - """ Send message to Provider - """ - with timeout(10): - self.connect() - # send update - self._Client__socket.send_string(message+'\0') # must include null byte - # get response - msg = self._Client__socket.recv_string() - reply = msg.split('=') - status = reply[0] - data = reply[1] - - if status == self._Client__kACK: - print("I: ACK: "+data) - #self.reply_handler(data) - else: - print("I: ERR -- %s" % msg) - - self._Client__socket.close() - self._Client__context.term() - - return reply - -class alicantoFederate(): - def __init__(self, config, exit_handler=None): - exit = exit_handler if exit_handler else self.ctrl_exit_handler - signal.signal(signal.SIGINT, exit) # ctlr + c - # Tag=>destination map - self.dests = {} - # Tag=>type map - self.types = {} - # Tag=>logic map - self.logic = {} - # Expression parser for logic - self.parser = Parser() - # Set of all tags (for printing) - self.tags = {} - - ############## Get counts from json ###################### - cfg = None - self.end_count = 0 - self.sub_count = 0 - self.pub_count = 0 - - with open(config, 'r') as file_: - cfg = json.loads(file_.read()) - try: - self.end_count = len(cfg["endpoints"]) - logger.info(f"\tNumber of endpoints: {self.end_count}") - except: - logger.info(f"\tNumber of endpoints: {self.end_count}") - try: - self.sub_count = len(cfg["subscriptions"]) - logger.info(f"\tNumber of subscriptions: {self.sub_count}") - except: - logger.info(f"\tNumber of subscriptions: {self.sub_count}") - try: - self.pub_count = len(cfg["publications"]) - logger.info(f"\tNumber of publications: {self.pub_count}") - except: - logger.info(f"\tNumber of publications: {self.pub_count}") - - # Diagnostics to confirm JSON config correctly added the required - # endpoints, publications, and subscriptions - self.endid = {} - self.end_dests = [] - for i in range(0, self.end_count): - self.endid[i] = cfg["endpoints"][i] - end_name = self.endid[i]["name"] - end_destination = self.endid[i]["destination"] - end_type = self.endid[i]["type"] - logger.debug(f"\tRegistered endpoint ---> end_name: {end_name} ---> end_destination: {end_destination}") - #end_name = end_name.split('/')[1] if '/' in end_name else end_name - self.tags.update({end_destination : 0}) - self.end_dests.append(end_destination) - self.dests[end_name] = end_destination - self.types[end_name] = end_type - self.types[end_destination] = end_type - # make end_dests elements unique - self.end_dests = list(set(self.end_dests)) - - self.subid = {} - self.sub_sources = [] - for i in range(0, self.sub_count): - self.subid[i] = cfg["subscriptions"][i] - sub_name = self.subid[i]["key"] - sub_type = self.subid[i]["type"] - sub_source = sub_name.split('/')[0] - self.sub_sources.append(sub_source) - try: - sub_info = self.subid[i]["info"] # stores logic for interdependencies - except: - sub_info = None - logger.debug(f"\tRegistered subscription ---> sub_name: {sub_name} ---> sub_type: {sub_type} ---> sub_info: {sub_info}") - #sub_name = sub_name.split('/')[1] if '/' in sub_name else sub_name - self.tags.update({sub_name : 0 }) - self.types[sub_name] = sub_type - if sub_info: - logger.debug(f"\t\t********** LOGIC **********") - for exp in sub_info.split(';'): - lhs, rhs = exp.split('=') - self.logic[lhs.strip()] = rhs.strip() - logger.debug(f'\t\t{exp.strip()}') - #make sub_sources elements unique - self.sub_sources = list(set(self.sub_sources)) - - self.__lock = threading.Lock() - - - for sub_source in self.sub_sources: - logger.debug(f"\tLaunching Subscriber Thread ---> subscription: udp://{sub_source}") - subber = TestSubscriber(sub_source) - #Subscriber.__init__(self, new_publish_endpoint) - #subber = Subscriber(new_publish_endpoint) - subber.subscription_handler = self._subscription_handler - self.__sub_thread = threading.Thread(target=subber.run) - self.__sub_thread.setName(sub_source) - self.__sub_thread.daemon = True - self.__sub_thread.start() - #subber.run() - - #new_server_endpoint = E.new_Endpoint() - #E.Endpoint_str_set(new_server_endpoint, 'tcp://127.0.0.1:5556') - #self.client = Client(new_server_endpoint) - #self.client = TestClient("127.0.0.1:5556") - - - self.end_clients = {} - for end_dest in self.end_dests: - # Initialize bennu Client - end_dest = end_dest.split('/')[0] - try: - self.end_clients[end_dest] = TestClient(end_dest) - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - #self.end_clients[end_dest] = None - logger.debug(f"\tEnd_clients: {self.end_clients}") - - def run(self): - ############## Entering Execution Mode ############################## - #h.alicantoFederateEnterExecutingMode(self.fed) - logger.info("Entered alicanto execution mode") - ''' - grantedtime = 0 - - # Blocking call for a time request at simulation time 0 - time = h.alicanto_TIME_ZERO - logger.debug(f"Requesting initial time {time}") - grantedtime = h.alicantoFederateRequestTime(self.fed, time) - logger.debug(f"Granted time {grantedtime}") - ''' - - # Endpoint initial values to alicanto - for i in range(self.end_count): - full_end_name = self.endid[i]["name"] - end_name = (full_end_name.split('/')[1] - if '/' in full_end_name - else full_end_name) - full_end_dest = self.endid[i]["destination"] - end_dest = (full_end_dest.split('/')[0] - if '/' in full_end_dest - else full_end_dest) - end_dest_tag = (full_end_dest.split('/')[1] - if '/' in full_end_dest - else full_end_dest) - #value = self.tag(end_name) - try: - self.end_clients[end_dest] = TestClient(end_dest) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) - #Try to keep up with threads - #time.sleep(1) - value = reply[1].rstrip('\x00') - self.endid[i]["value"] = value - self.tag(full_end_dest, value) - logger.debug(f"Initial Endpoints {end_name} / {end_dest}:{value} ") - - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - continue - - - - ########## Main co-simulation loop #################################### - # As long as granted time is in the time range to be simulated... - #Wait for other sims to - time.sleep(60) - while True: - self.print_state() - time.sleep(0.1) - for i in range(self.end_count): - full_end_name = self.endid[i]["name"] - end_name = (full_end_name.split('/')[1] - if '/' in full_end_name - else full_end_name) - full_end_dest = self.endid[i]["destination"] - end_dest = (full_end_dest.split('/')[0] - if '/' in full_end_dest - else full_end_dest) - end_dest_tag = (full_end_dest.split('/')[1] - if '/' in full_end_dest - else full_end_dest) - - - if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double': - if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))): - #Handle Logic - if self.logic[full_end_dest] is not None: - expr = self.parser.parse(self.logic[full_end_dest]) - ''' - # Assign variables - vars = {} - for var in expr.variables(): - vars[var] = self.tag(var) - ''' - i = 0 - # Assign vars not working, so assign token manually - for token in expr.tokens: - for search_tag in self.tags: - if token.toString() == search_tag: - expr.tokens[i].number_ = self.tag(token.toString()) - i += 1 - # Evaluate expression - value = expr.evaluate(vars) - value = str(value).lower() - if value != self.tag(full_end_dest): - logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") - # Assign new tag value - self._tag(full_end_dest, value) - # Skip if value is unchanged - elif value == self.tag(full_end_dest): - continue - - try: - self.end_clients[end_dest] = TestClient(end_dest) - if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest)) - else: - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) - time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) - #Try to help thread craziness - #time.sleep(1) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - continue - elif self.types[full_end_name] == 'bool': - if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower(): - #Handle Logic - if self.logic[full_end_dest] is not None: - expr = self.parser.parse(self.logic[full_end_dest]) - ''' - # Assign variables - vars = {} - for var in expr.variables(): - vars[var] = self.tag(var) - ''' - i = 0 - # Assign vars not working, so assign token manually - for token in expr.tokens: - for search_tag in self.tags: - if token.toString() == search_tag: - expr.tokens[i].number_ = bool(self.tag(token.toString())) - i += 1 - # Evaluate expression - value = expr.evaluate(vars) - value = str(value) - if value != self.tag(full_end_dest): - logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") - # Assign new tag value - self._tag(full_end_dest, value) - # Skip if value is unchanged - elif value == self.tag(full_end_dest): - continue - try: - self.end_clients[end_dest] = TestClient(end_dest) - if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_dest)) - else: - self.end_clients[end_dest].write_digital_point(end_dest_tag, self.tag(full_end_name)) - time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) - - #Try to help thread craziness - #time.sleep(1) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - continue - - - - def print_state(self): - logger.debug("=================== DATA ===================") - for tag in self.tags: - logger.debug(f"{tag:<30} --- {self.tag(tag):}") - logger.debug("============================================") - - def get_type(self, tag): - return self.types[tag] - - def _tag(self, tag, val): - if self.get_type(tag) == 'bool': - # Make sure any val is 'true'|'false' - val = bool(strtobool(str(val))) - val = str(val).lower() - else: - val = float(val) - return self.tag(tag, val) - - def tag(self, tag, val=None): - raise NotImplementedError("Subclass and implement this method") - - def _client_handler(reply): - split = reply.split(',') - print("Reply:") - if len(split) > 1: - for tag in split: - print("\t", tag) - else: - print("\t", split[0]) - return split[0] - - def _subscription_handler(self, message): - """Receive Subscription message - This method gets called by the Subscriber base class in its run() - method when a message from a publisher is received. - - Ex message: "load-1_bus-101.mw:999.000,load-1_bus-101.active:true," - - Args: - message (str): published zmq message as a string - """ - points = message.split(',') - points = points[:-1] # remove last element since it might be empty - #print(message) - sub_source = threading.current_thread().name - - for point in points: - if point == "": - continue - ''' - split = point.split('/') - tag_source = split[0] - tag = split[1].split(':')[0] - value = split[1].split(':')[0] - ''' - - tag = point.split(':')[0] - full_tag = sub_source + '/' + tag - value = point.split(':')[1] - - if full_tag not in self.tags: - continue - if value.lower() == 'false': - value = False - field = 'status' - elif value.lower() == 'true': - value = True - field = 'status' - else: - value = float(value) - field = 'value' - - if field == 'value': - if not math.isclose(float(self.tag(full_tag)), value): - with self.__lock: - self.tag(full_tag, value) - #update - print("UPDATE NOW: ",full_tag) - print("New value: ",value) - else: - continue - elif field == 'status': - self.tag(full_tag, value) - #update - print("UPDATE NOW: ",full_tag) - print("New value: ",value) - else: - continue - - - - - def ctrl_exit_handler(self, signal, frame): - print("SIGINT or CTRL-C detected. Exiting gracefully") - sys.exit() diff --git a/src/pybennu/pybennu/providers/power/power_daemon.py b/src/pybennu/pybennu/providers/power/power_daemon.py index 65230a2e..95869bd9 100644 --- a/src/pybennu/pybennu/providers/power/power_daemon.py +++ b/src/pybennu/pybennu/providers/power/power_daemon.py @@ -1,12 +1,3 @@ -###################### UNCLASSIFIED // OFFICIAL USE ONLY ###################### -# -# Copyright 2018 National Technology & Engineering Solutions of Sandia, -# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, -# there is a non-exclusive license for use of this work by or on behalf -# of the U.S. Government. Export of this data may require a license from -# the United States Government. -# -############################################################################### import argparse import os import platform From 152faa9ac373b44b2f325d4acee8a884d79a56b3 Mon Sep 17 00:00:00 2001 From: jarwils Date: Mon, 29 Jan 2024 14:58:25 -0700 Subject: [PATCH 33/35] fix readme --- src/pybennu/pybennu/providers/alicanto/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pybennu/pybennu/providers/alicanto/README.md b/src/pybennu/pybennu/providers/alicanto/README.md index 31931fa1..07df8f62 100644 --- a/src/pybennu/pybennu/providers/alicanto/README.md +++ b/src/pybennu/pybennu/providers/alicanto/README.md @@ -6,6 +6,6 @@ Alicanto runs as a Subscriber and Client object. It takes in a json file which d - Endpoints tell Alicanto where to corelate that subscribed point to a server-endpoint Usage: -`pybennu-power-solver -c alicanto.json -d DEBUG` +`pybennu-alicanto -c alicanto.json -d DEBUG` Please update this README as Alicanto is used more \ No newline at end of file From 022446cc3f242df9a8a49aba254459f0927e21a4 Mon Sep 17 00:00:00 2001 From: jarwils Date: Mon, 29 Jan 2024 15:29:35 -0700 Subject: [PATCH 34/35] clean up comments --- src/pybennu/pybennu/executables/pybennu_alicanto.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index 83fa71d1..a224bba0 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -212,12 +212,9 @@ def run(self): end_dest_tag = (full_end_dest.split('/')[1] if '/' in full_end_dest else full_end_dest) - #value = self.tag(end_name) try: self.end_clients[end_dest] = alicantoClient(end_dest) reply = self.end_clients[end_dest].send("READ="+end_dest_tag) - #Try to keep up with threads - #time.sleep(1) value = reply[1].rstrip('\x00') self.endid[i]["value"] = value self.tag(full_end_dest, value) @@ -226,9 +223,6 @@ def run(self): except: logger.error(f"\tError Initializing Client: {self.end_clients}") continue - - # Wait for other sims to start, not sure if needed - #time.sleep(5) ########## Main co-simulation loop #################################### while True: @@ -247,7 +241,6 @@ def run(self): if '/' in full_end_dest else full_end_dest) - # !!need to add something to handle binary points if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double': if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))): #Handle Logic From 9507a87f28565e8a055608ee109cc16021ff379e Mon Sep 17 00:00:00 2001 From: jarwils Date: Wed, 20 Mar 2024 11:50:37 -0600 Subject: [PATCH 35/35] latest and greatest alicanto --- .../pybennu/executables/pybennu_alicanto.py | 283 +++++++++--------- 1 file changed, 148 insertions(+), 135 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index a224bba0..db1fea38 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -1,3 +1,5 @@ +# Edits by Jamie Thorpe and Meg Sahakian 3/14/2024 + """Alicanto is a new feature made to be a more simple co-simulation tool than HELICS. The code is similar to the bennu HELICS code but stripped down. @@ -66,8 +68,12 @@ def __init__(self, end_dest): def send(self, message): """ Send message to Provider """ + reply = None with timeout(10): self.connect() + + #try: + # send update self._Client__socket.send_string(message+'\0') # must include null byte # get response @@ -77,16 +83,25 @@ def send(self, message): data = reply[1] if status == self._Client__kACK: - print("I: ACK: "+data) + logger.info(f"I: ACK -- {data}") #self.reply_handler(data) else: - print("I: ERR -- %s" % msg) + logger.error(f"I: ERR -- {msg}") + #finally: + # logger.debug(f"DEBUG: Closing socket and context...") + # self._Client__socket.close() + # self._Client__context.term() + # logger.debug(f"DEBUG: ... Closed socket and context") + + try: self._Client__socket.close() self._Client__context.term() + except: + pass - return reply - + return reply + class alicanto(): def __init__(self, config, debug=False, exit_handler=None): @@ -101,16 +116,16 @@ def __init__(self, config, debug=False, exit_handler=None): self.lock = threading.Lock() # Initialize system state - self.state = {} + self.state = dict() # Tag=>destination map - self.dests = {} + self.dests = dict() # Tag=>type map - self.types = {} - self.logic = {} + self.types = dict() + self.logic = dict() # Expression parser for logic self.parser = Parser() # Set of all tags - self.tags = {} + self.tags = dict() ############## Get counts from json ###################### cfg = None @@ -133,7 +148,7 @@ def __init__(self, config, debug=False, exit_handler=None): # Diagnostics to confirm JSON config correctly added the required # endpoints and subscriptions - self.endid = {} + self.endid = dict() self.end_dests = [] for i, endpoint in enumerate(cfg["endpoints"]): self.endid[i] = endpoint @@ -149,7 +164,7 @@ def __init__(self, config, debug=False, exit_handler=None): # make end_dests elements unique self.end_dests = list(set(self.end_dests)) - self.subid = {} + self.subid = dict() self.sub_sources = [] for i in range(0, self.sub_count): self.subid[i] = cfg["subscriptions"][i] @@ -185,16 +200,6 @@ def __init__(self, config, debug=False, exit_handler=None): self.__sub_thread.daemon = True self.__sub_thread.start() - self.end_clients = {} - for end_dest in self.end_dests: - # Initialize bennu Client - end_dest = end_dest.split('/')[0] - try: - self.end_clients[end_dest] = alicantoClient(end_dest) - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - for key in list(self.end_clients.keys()): - logger.info(f"End_client: {key}") def run(self): ############## Entering Execution Mode ############################## @@ -212,17 +217,32 @@ def run(self): end_dest_tag = (full_end_dest.split('/')[1] if '/' in full_end_dest else full_end_dest) + + logger.info("Reading initial value from client now...") + try: + client = alicantoClient(end_dest) + except: + logger.error(f"\tError Initializing Client: {client}") + continue + + try: + reply = client.send("READ="+end_dest_tag) + if not reply: + continue + except: + logger.error(f"\tError Reading remote value") + continue + try: - self.end_clients[end_dest] = alicantoClient(end_dest) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) value = reply[1].rstrip('\x00') self.endid[i]["value"] = value - self.tag(full_end_dest, value) + self.set_tag(full_end_dest, value) logger.debug(f"Initial Endpoints {end_name} / {end_dest}:{value} ") - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") + logger.error(f"\tError Parsing response from Client") continue + + logger.info("... Client value status initialized!") ########## Main co-simulation loop #################################### while True: @@ -241,108 +261,112 @@ def run(self): if '/' in full_end_dest else full_end_dest) - if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double': - if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))): - #Handle Logic - if self.logic[full_end_dest] is not None: - expr = self.parser.parse(self.logic[full_end_dest]) - ''' - # Assign variables - vars = {} - for var in expr.variables(): - vars[var] = self.tag(var) - ''' - i = 0 - # Assign vars not working, so assign token manually - for token in expr.tokens: - for search_tag in self.tags: - if token.toString() == search_tag: - expr.tokens[i].number_ = self.tag(token.toString()) - i += 1 - # Evaluate expression - value = expr.evaluate(vars) - value = str(value).lower() - if value != self.tag(full_end_dest): - logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") - # Assign new tag value - self.tag(full_end_dest, value) - # Skip if value is unchanged - elif value == self.tag(full_end_dest): - continue - - try: - self.end_clients[end_dest] = alicantoClient(end_dest) - if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest)) - else: - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) - time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - continue - elif self.types[full_end_name] == 'bool': - if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower(): - #Handle Logic - if self.logic[full_end_dest] is not None: - expr = self.parser.parse(self.logic[full_end_dest]) - ''' - # Assign variables - vars = {} - for var in expr.variables(): - vars[var] = self.tag(var) - ''' - i = 0 - # Assign vars not working, so assign token manually - for token in expr.tokens: - for search_tag in self.tags: - if token.toString() == search_tag: - expr.tokens[i].number_ = bool(self.tag(token.toString())) - i += 1 - # Evaluate expression - value = expr.evaluate(vars) - value = str(value) - if value != self.tag(full_end_dest): - logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") - # Assign new tag value - self.tag(full_end_dest, value) - # Skip if value is unchanged - elif value == self.tag(full_end_dest): - continue - try: - self.end_clients[end_dest] = alicantoClient(end_dest) - if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_dest))) - else: - self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_name))) - time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - continue + these_types = self.types[full_end_name] + + # If we don't need to update the value, escape early + if (these_types == 'float' or these_types == 'double') and (math.isclose(float(self.get_tag(full_end_name)), float(self.get_tag(full_end_dest)))): + #logger.debug(f"SKIP: No need to update {self.get_tag(full_end_name)} vs {self.get_tag(full_end_dest)}") + continue + true_set = (True, 'True', 'true', '1', 1) + if (these_types == 'bool') and ((self.get_tag(full_end_name) in true_set) == (self.get_tag(full_end_dest) in true_set)): + #logger.debug(f"SKIP: No need to update {self.get_tag(full_end_name)} vs {self.get_tag(full_end_dest)}") + continue + + # Handle the case where there is logic involved + if self.logic[full_end_dest] is not None: + expr = self.parser.parse(self.logic[full_end_dest]) + + # Assign variables + these_vars = dict() + + ''' + for v in expr.variables(): + t = self.get_tag(v) + if these_types == 'bool': + t = bool(t) + + these_vars[v] = t + + ''' + # Automatic variable parsing with expression parser not working, so assign token manually + for i,token in enumerate(expr.tokens): + if token.toString() in self.tags: + t = self.get_tag(token.toString()) + if these_types == 'bool': + t = bool(t) + + expr.tokens[i].number_ = t + + # Evaluate expression + value = expr.evaluate(these_vars) + value = str(value) + if value != self.get_tag(full_end_dest): + self.set_tag(full_end_dest, value) + else: + continue + + tag_ptr = full_end_dest + + else: + tag_ptr = full_end_name + + + # Send update + logger.info(f"Sending value update...") + try: + client = alicantoClient(end_dest) + except: + logger.error(f"\tError Initializing Client: {client}") + continue + + try: + if these_types == 'float' or these_types == 'double': + client.write_analog_point(end_dest_tag, self.get_tag(tag_ptr)) + elif these_types == 'bool': + client.write_digital_point(end_dest_tag, eval(self.get_tag(tag_ptr))) + except: + logger.error(f"\tError Writing value to remote Client") + + time.sleep(0.5) + + try: + reply = client.send("READ="+end_dest_tag) + if not reply: + continue + except: + logger.error(f"\tError Reading remote value") + continue + + try: + value = reply[1].rstrip('\x00') + self.set_tag(full_end_dest, value) + except: + logger.error(f"\tError Parsing response from Client") + continue + + logger.info(f"... Update sent!") + def publish_state(self): logger.info("=================== DATA ===================") for tag in self.tags: - logger.info(f"{tag:<30} --- {self.tag(tag):}") + logger.info(f"{tag:<30} --- {self.get_tag(tag):}") logger.info("============================================") def get_type(self, tag): return self.types[tag] - def tag(self, tag, value=None): + def get_tag(self, tag): + with self.lock: + if tag in self.state: + return self.state[tag] + else: + return False if self.get_type(tag) == 'bool' else 0 + + def set_tag(self, tag, value=None): with self.lock: if value is not None: self.state[tag] = value - else: - if tag in self.state: - return self.state[tag] - else: - return False if self.get_type(tag) == 'bool' else 0 def _subscription_handler(self, message): """Receive Subscription message @@ -355,14 +379,11 @@ def _subscription_handler(self, message): message (str): published zmq message as a string """ points = message.split(',') - points = points[:-1] # remove last element since it might be empty sub_source = threading.current_thread().name for point in points: - if not point: + if not point or len(point) <= 1: continue - if point == "": - continue try: tag = point.split(':')[0] @@ -386,21 +407,13 @@ def _subscription_handler(self, message): field = 'value' if field == 'value': - if not math.isclose(float(self.tag(full_tag)), value): - self.tag(full_tag, value) - logger.debug("UPDATE NOW: "+full_tag) - logger.debug("New value: "+str(value)) - else: - continue + if not math.isclose(float(self.get_tag(full_tag)), value): + self.set_tag(full_tag, value) + logger.debug(f"UPDATE NOW: {full_tag} = {value}") elif field == 'status': - if self.tag(full_tag) != value: - self.tag(full_tag, value) - logger.debug("UPDATE NOW: "+full_tag) - logger.debug("New value: "+str(value)) - else: - continue - else: - continue + if self.get_tag(full_tag) != value: + self.set_tag(full_tag, value) + logger.debug(f"UPDATE NOW: {full_tag} = {value}") def ctrl_exit_handler(self, signal, frame): logger.info("SIGINT or CTRL-C detected. Exiting gracefully") @@ -423,4 +436,4 @@ def main(): try: main() except KeyboardInterrupt: - sys.exit() \ No newline at end of file + sys.exit()