From 6c3a68f2d9a3d94c03dd21c2dc28df7cc974a623 Mon Sep 17 00:00:00 2001 From: Jared W <82229931+spitfirejmw@users.noreply.github.com> Date: Wed, 20 Mar 2024 14:29:15 -0600 Subject: [PATCH] Latest and greatest to Alicanto (#21) * latest and greatest alicanto --------- Co-authored-by: jarwils --- .../pybennu/executables/pybennu_alicanto.py | 280 +++++++++--------- 1 file changed, 146 insertions(+), 134 deletions(-) diff --git a/src/pybennu/pybennu/executables/pybennu_alicanto.py b/src/pybennu/pybennu/executables/pybennu_alicanto.py index a224bba..aca4c32 100644 --- a/src/pybennu/pybennu/executables/pybennu_alicanto.py +++ b/src/pybennu/pybennu/executables/pybennu_alicanto.py @@ -66,8 +66,12 @@ def __init__(self, end_dest): def send(self, message): """ Send message to Provider """ + reply = None with timeout(10): self.connect() + + #try: + # send update self._Client__socket.send_string(message+'\0') # must include null byte # get response @@ -77,16 +81,25 @@ def send(self, message): data = reply[1] if status == self._Client__kACK: - print("I: ACK: "+data) + logger.info(f"I: ACK -- {data}") #self.reply_handler(data) else: - print("I: ERR -- %s" % msg) + logger.error(f"I: ERR -- {msg}") + + #finally: + # logger.debug(f"DEBUG: Closing socket and context...") + # self._Client__socket.close() + # self._Client__context.term() + # logger.debug(f"DEBUG: ... Closed socket and context") + try: self._Client__socket.close() self._Client__context.term() + except: + pass - return reply - + return reply + class alicanto(): def __init__(self, config, debug=False, exit_handler=None): @@ -101,16 +114,16 @@ def __init__(self, config, debug=False, exit_handler=None): self.lock = threading.Lock() # Initialize system state - self.state = {} + self.state = dict() # Tag=>destination map - self.dests = {} + self.dests = dict() # Tag=>type map - self.types = {} - self.logic = {} + self.types = dict() + self.logic = dict() # Expression parser for logic self.parser = Parser() # Set of all tags - self.tags = {} + self.tags = dict() ############## Get counts from json ###################### cfg = None @@ -133,7 +146,7 @@ def __init__(self, config, debug=False, exit_handler=None): # Diagnostics to confirm JSON config correctly added the required # endpoints and subscriptions - self.endid = {} + self.endid = dict() self.end_dests = [] for i, endpoint in enumerate(cfg["endpoints"]): self.endid[i] = endpoint @@ -149,7 +162,7 @@ def __init__(self, config, debug=False, exit_handler=None): # make end_dests elements unique self.end_dests = list(set(self.end_dests)) - self.subid = {} + self.subid = dict() self.sub_sources = [] for i in range(0, self.sub_count): self.subid[i] = cfg["subscriptions"][i] @@ -185,16 +198,6 @@ def __init__(self, config, debug=False, exit_handler=None): self.__sub_thread.daemon = True self.__sub_thread.start() - self.end_clients = {} - for end_dest in self.end_dests: - # Initialize bennu Client - end_dest = end_dest.split('/')[0] - try: - self.end_clients[end_dest] = alicantoClient(end_dest) - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - for key in list(self.end_clients.keys()): - logger.info(f"End_client: {key}") def run(self): ############## Entering Execution Mode ############################## @@ -212,17 +215,32 @@ def run(self): end_dest_tag = (full_end_dest.split('/')[1] if '/' in full_end_dest else full_end_dest) + + logger.info("Reading initial value from client now...") + try: + client = alicantoClient(end_dest) + except: + logger.error(f"\tError Initializing Client: {client}") + continue + + try: + reply = client.send("READ="+end_dest_tag) + if not reply: + continue + except: + logger.error(f"\tError Reading remote value") + continue + try: - self.end_clients[end_dest] = alicantoClient(end_dest) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) value = reply[1].rstrip('\x00') self.endid[i]["value"] = value - self.tag(full_end_dest, value) + self.set_tag(full_end_dest, value) logger.debug(f"Initial Endpoints {end_name} / {end_dest}:{value} ") - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") + logger.error(f"\tError Parsing response from Client") continue + + logger.info("... Client value status initialized!") ########## Main co-simulation loop #################################### while True: @@ -241,108 +259,112 @@ def run(self): if '/' in full_end_dest else full_end_dest) - if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double': - if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))): - #Handle Logic - if self.logic[full_end_dest] is not None: - expr = self.parser.parse(self.logic[full_end_dest]) - ''' - # Assign variables - vars = {} - for var in expr.variables(): - vars[var] = self.tag(var) - ''' - i = 0 - # Assign vars not working, so assign token manually - for token in expr.tokens: - for search_tag in self.tags: - if token.toString() == search_tag: - expr.tokens[i].number_ = self.tag(token.toString()) - i += 1 - # Evaluate expression - value = expr.evaluate(vars) - value = str(value).lower() - if value != self.tag(full_end_dest): - logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") - # Assign new tag value - self.tag(full_end_dest, value) - # Skip if value is unchanged - elif value == self.tag(full_end_dest): - continue - - try: - self.end_clients[end_dest] = alicantoClient(end_dest) - if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest)) - else: - self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name)) - time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - continue - elif self.types[full_end_name] == 'bool': - if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower(): - #Handle Logic - if self.logic[full_end_dest] is not None: - expr = self.parser.parse(self.logic[full_end_dest]) - ''' - # Assign variables - vars = {} - for var in expr.variables(): - vars[var] = self.tag(var) - ''' - i = 0 - # Assign vars not working, so assign token manually - for token in expr.tokens: - for search_tag in self.tags: - if token.toString() == search_tag: - expr.tokens[i].number_ = bool(self.tag(token.toString())) - i += 1 - # Evaluate expression - value = expr.evaluate(vars) - value = str(value) - if value != self.tag(full_end_dest): - logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}") - # Assign new tag value - self.tag(full_end_dest, value) - # Skip if value is unchanged - elif value == self.tag(full_end_dest): - continue - try: - self.end_clients[end_dest] = alicantoClient(end_dest) - if self.logic[full_end_dest] is not None: - self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_dest))) - else: - self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_name))) - time.sleep(0.5) - reply = self.end_clients[end_dest].send("READ="+end_dest_tag) - value = reply[1].rstrip('\x00') - self.tag(full_end_dest, value) - except: - logger.error(f"\tError Initializing Client: {self.end_clients}") - continue + these_types = self.types[full_end_name] + + # If we don't need to update the value, escape early + if (these_types == 'float' or these_types == 'double') and (math.isclose(float(self.get_tag(full_end_name)), float(self.get_tag(full_end_dest)))): + #logger.debug(f"SKIP: No need to update {self.get_tag(full_end_name)} vs {self.get_tag(full_end_dest)}") + continue + true_set = (True, 'True', 'true', '1', 1) + if (these_types == 'bool') and ((self.get_tag(full_end_name) in true_set) == (self.get_tag(full_end_dest) in true_set)): + #logger.debug(f"SKIP: No need to update {self.get_tag(full_end_name)} vs {self.get_tag(full_end_dest)}") + continue + + # Handle the case where there is logic involved + if self.logic[full_end_dest] is not None: + expr = self.parser.parse(self.logic[full_end_dest]) + + # Assign variables + these_vars = dict() + + ''' + for v in expr.variables(): + t = self.get_tag(v) + if these_types == 'bool': + t = bool(t) + + these_vars[v] = t + + ''' + # Automatic variable parsing with expression parser not working, so assign token manually + for i,token in enumerate(expr.tokens): + if token.toString() in self.tags: + t = self.get_tag(token.toString()) + if these_types == 'bool': + t = bool(t) + + expr.tokens[i].number_ = t + + # Evaluate expression + value = expr.evaluate(these_vars) + value = str(value) + if value != self.get_tag(full_end_dest): + self.set_tag(full_end_dest, value) + else: + continue + + tag_ptr = full_end_dest + + else: + tag_ptr = full_end_name + + + # Send update + logger.info(f"Sending value update...") + try: + client = alicantoClient(end_dest) + except: + logger.error(f"\tError Initializing Client: {client}") + continue + + try: + if these_types == 'float' or these_types == 'double': + client.write_analog_point(end_dest_tag, self.get_tag(tag_ptr)) + elif these_types == 'bool': + client.write_digital_point(end_dest_tag, eval(self.get_tag(tag_ptr))) + except: + logger.error(f"\tError Writing value to remote Client") + + time.sleep(0.5) + + try: + reply = client.send("READ="+end_dest_tag) + if not reply: + continue + except: + logger.error(f"\tError Reading remote value") + continue + + try: + value = reply[1].rstrip('\x00') + self.set_tag(full_end_dest, value) + except: + logger.error(f"\tError Parsing response from Client") + continue + + logger.info(f"... Update sent!") + def publish_state(self): logger.info("=================== DATA ===================") for tag in self.tags: - logger.info(f"{tag:<30} --- {self.tag(tag):}") + logger.info(f"{tag:<30} --- {self.get_tag(tag):}") logger.info("============================================") def get_type(self, tag): return self.types[tag] - def tag(self, tag, value=None): + def get_tag(self, tag): + with self.lock: + if tag in self.state: + return self.state[tag] + else: + return False if self.get_type(tag) == 'bool' else 0 + + def set_tag(self, tag, value=None): with self.lock: if value is not None: self.state[tag] = value - else: - if tag in self.state: - return self.state[tag] - else: - return False if self.get_type(tag) == 'bool' else 0 def _subscription_handler(self, message): """Receive Subscription message @@ -359,10 +381,8 @@ def _subscription_handler(self, message): sub_source = threading.current_thread().name for point in points: - if not point: + if not point or len(point) <= 1: continue - if point == "": - continue try: tag = point.split(':')[0] @@ -386,21 +406,13 @@ def _subscription_handler(self, message): field = 'value' if field == 'value': - if not math.isclose(float(self.tag(full_tag)), value): - self.tag(full_tag, value) - logger.debug("UPDATE NOW: "+full_tag) - logger.debug("New value: "+str(value)) - else: - continue + if not math.isclose(float(self.get_tag(full_tag)), value): + self.set_tag(full_tag, value) + logger.debug(f"UPDATE NOW: {full_tag} = {value}") elif field == 'status': - if self.tag(full_tag) != value: - self.tag(full_tag, value) - logger.debug("UPDATE NOW: "+full_tag) - logger.debug("New value: "+str(value)) - else: - continue - else: - continue + if self.get_tag(full_tag) != value: + self.set_tag(full_tag, value) + logger.debug(f"UPDATE NOW: {full_tag} = {value}") def ctrl_exit_handler(self, signal, frame): logger.info("SIGINT or CTRL-C detected. Exiting gracefully") @@ -423,4 +435,4 @@ def main(): try: main() except KeyboardInterrupt: - sys.exit() \ No newline at end of file + sys.exit()