Skip to content

Commit

Permalink
Latest and greatest to Alicanto (#21)
Browse files Browse the repository at this point in the history
* latest and greatest alicanto

---------

Co-authored-by: jarwils <[email protected]>
  • Loading branch information
spitfirejmw and spitfirejmw authored Mar 20, 2024
1 parent d3f7582 commit 6c3a68f
Showing 1 changed file with 146 additions and 134 deletions.
280 changes: 146 additions & 134 deletions src/pybennu/pybennu/executables/pybennu_alicanto.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ def __init__(self, end_dest):
def send(self, message):
""" Send message to Provider
"""
reply = None
with timeout(10):
self.connect()

#try:

# send update
self._Client__socket.send_string(message+'\0') # must include null byte
# get response
Expand All @@ -77,16 +81,25 @@ def send(self, message):
data = reply[1]

if status == self._Client__kACK:
print("I: ACK: "+data)
logger.info(f"I: ACK -- {data}")
#self.reply_handler(data)
else:
print("I: ERR -- %s" % msg)
logger.error(f"I: ERR -- {msg}")

#finally:
# logger.debug(f"DEBUG: Closing socket and context...")
# self._Client__socket.close()
# self._Client__context.term()
# logger.debug(f"DEBUG: ... Closed socket and context")

try:
self._Client__socket.close()
self._Client__context.term()
except:
pass

return reply

return reply
class alicanto():
def __init__(self, config, debug=False, exit_handler=None):

Expand All @@ -101,16 +114,16 @@ def __init__(self, config, debug=False, exit_handler=None):
self.lock = threading.Lock()

# Initialize system state
self.state = {}
self.state = dict()
# Tag=>destination map
self.dests = {}
self.dests = dict()
# Tag=>type map
self.types = {}
self.logic = {}
self.types = dict()
self.logic = dict()
# Expression parser for logic
self.parser = Parser()
# Set of all tags
self.tags = {}
self.tags = dict()

############## Get counts from json ######################
cfg = None
Expand All @@ -133,7 +146,7 @@ def __init__(self, config, debug=False, exit_handler=None):

# Diagnostics to confirm JSON config correctly added the required
# endpoints and subscriptions
self.endid = {}
self.endid = dict()
self.end_dests = []
for i, endpoint in enumerate(cfg["endpoints"]):
self.endid[i] = endpoint
Expand All @@ -149,7 +162,7 @@ def __init__(self, config, debug=False, exit_handler=None):
# make end_dests elements unique
self.end_dests = list(set(self.end_dests))

self.subid = {}
self.subid = dict()
self.sub_sources = []
for i in range(0, self.sub_count):
self.subid[i] = cfg["subscriptions"][i]
Expand Down Expand Up @@ -185,16 +198,6 @@ def __init__(self, config, debug=False, exit_handler=None):
self.__sub_thread.daemon = True
self.__sub_thread.start()

self.end_clients = {}
for end_dest in self.end_dests:
# Initialize bennu Client
end_dest = end_dest.split('/')[0]
try:
self.end_clients[end_dest] = alicantoClient(end_dest)
except:
logger.error(f"\tError Initializing Client: {self.end_clients}")
for key in list(self.end_clients.keys()):
logger.info(f"End_client: {key}")

def run(self):
############## Entering Execution Mode ##############################
Expand All @@ -212,17 +215,32 @@ def run(self):
end_dest_tag = (full_end_dest.split('/')[1]
if '/' in full_end_dest
else full_end_dest)

logger.info("Reading initial value from client now...")
try:
client = alicantoClient(end_dest)
except:
logger.error(f"\tError Initializing Client: {client}")
continue

try:
reply = client.send("READ="+end_dest_tag)
if not reply:
continue
except:
logger.error(f"\tError Reading remote value")
continue

try:
self.end_clients[end_dest] = alicantoClient(end_dest)
reply = self.end_clients[end_dest].send("READ="+end_dest_tag)
value = reply[1].rstrip('\x00')
self.endid[i]["value"] = value
self.tag(full_end_dest, value)
self.set_tag(full_end_dest, value)
logger.debug(f"Initial Endpoints {end_name} / {end_dest}:{value} ")

except:
logger.error(f"\tError Initializing Client: {self.end_clients}")
logger.error(f"\tError Parsing response from Client")
continue

logger.info("... Client value status initialized!")

########## Main co-simulation loop ####################################
while True:
Expand All @@ -241,108 +259,112 @@ def run(self):
if '/' in full_end_dest
else full_end_dest)

if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double':
if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))):
#Handle Logic
if self.logic[full_end_dest] is not None:
expr = self.parser.parse(self.logic[full_end_dest])
'''
# Assign variables
vars = {}
for var in expr.variables():
vars[var] = self.tag(var)
'''
i = 0
# Assign vars not working, so assign token manually
for token in expr.tokens:
for search_tag in self.tags:
if token.toString() == search_tag:
expr.tokens[i].number_ = self.tag(token.toString())
i += 1
# Evaluate expression
value = expr.evaluate(vars)
value = str(value).lower()
if value != self.tag(full_end_dest):
logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}")
# Assign new tag value
self.tag(full_end_dest, value)
# Skip if value is unchanged
elif value == self.tag(full_end_dest):
continue

try:
self.end_clients[end_dest] = alicantoClient(end_dest)
if self.logic[full_end_dest] is not None:
self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest))
else:
self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name))
time.sleep(0.5)
reply = self.end_clients[end_dest].send("READ="+end_dest_tag)
value = reply[1].rstrip('\x00')
self.tag(full_end_dest, value)
except:
logger.error(f"\tError Initializing Client: {self.end_clients}")
continue
elif self.types[full_end_name] == 'bool':
if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower():
#Handle Logic
if self.logic[full_end_dest] is not None:
expr = self.parser.parse(self.logic[full_end_dest])
'''
# Assign variables
vars = {}
for var in expr.variables():
vars[var] = self.tag(var)
'''
i = 0
# Assign vars not working, so assign token manually
for token in expr.tokens:
for search_tag in self.tags:
if token.toString() == search_tag:
expr.tokens[i].number_ = bool(self.tag(token.toString()))
i += 1
# Evaluate expression
value = expr.evaluate(vars)
value = str(value)
if value != self.tag(full_end_dest):
logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}")
# Assign new tag value
self.tag(full_end_dest, value)
# Skip if value is unchanged
elif value == self.tag(full_end_dest):
continue
try:
self.end_clients[end_dest] = alicantoClient(end_dest)
if self.logic[full_end_dest] is not None:
self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_dest)))
else:
self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_name)))
time.sleep(0.5)
reply = self.end_clients[end_dest].send("READ="+end_dest_tag)
value = reply[1].rstrip('\x00')
self.tag(full_end_dest, value)
except:
logger.error(f"\tError Initializing Client: {self.end_clients}")
continue
these_types = self.types[full_end_name]

# If we don't need to update the value, escape early
if (these_types == 'float' or these_types == 'double') and (math.isclose(float(self.get_tag(full_end_name)), float(self.get_tag(full_end_dest)))):
#logger.debug(f"SKIP: No need to update {self.get_tag(full_end_name)} vs {self.get_tag(full_end_dest)}")
continue
true_set = (True, 'True', 'true', '1', 1)
if (these_types == 'bool') and ((self.get_tag(full_end_name) in true_set) == (self.get_tag(full_end_dest) in true_set)):
#logger.debug(f"SKIP: No need to update {self.get_tag(full_end_name)} vs {self.get_tag(full_end_dest)}")
continue

# Handle the case where there is logic involved
if self.logic[full_end_dest] is not None:
expr = self.parser.parse(self.logic[full_end_dest])

# Assign variables
these_vars = dict()

'''
for v in expr.variables():
t = self.get_tag(v)
if these_types == 'bool':
t = bool(t)
these_vars[v] = t
'''
# Automatic variable parsing with expression parser not working, so assign token manually
for i,token in enumerate(expr.tokens):
if token.toString() in self.tags:
t = self.get_tag(token.toString())
if these_types == 'bool':
t = bool(t)

expr.tokens[i].number_ = t

# Evaluate expression
value = expr.evaluate(these_vars)
value = str(value)
if value != self.get_tag(full_end_dest):
self.set_tag(full_end_dest, value)
else:
continue

tag_ptr = full_end_dest

else:
tag_ptr = full_end_name


# Send update
logger.info(f"Sending value update...")
try:
client = alicantoClient(end_dest)
except:
logger.error(f"\tError Initializing Client: {client}")
continue

try:
if these_types == 'float' or these_types == 'double':
client.write_analog_point(end_dest_tag, self.get_tag(tag_ptr))
elif these_types == 'bool':
client.write_digital_point(end_dest_tag, eval(self.get_tag(tag_ptr)))
except:
logger.error(f"\tError Writing value to remote Client")

time.sleep(0.5)

try:
reply = client.send("READ="+end_dest_tag)
if not reply:
continue
except:
logger.error(f"\tError Reading remote value")
continue

try:
value = reply[1].rstrip('\x00')
self.set_tag(full_end_dest, value)
except:
logger.error(f"\tError Parsing response from Client")
continue

logger.info(f"... Update sent!")


def publish_state(self):
logger.info("=================== DATA ===================")
for tag in self.tags:
logger.info(f"{tag:<30} --- {self.tag(tag):}")
logger.info(f"{tag:<30} --- {self.get_tag(tag):}")
logger.info("============================================")

def get_type(self, tag):
return self.types[tag]

def tag(self, tag, value=None):
def get_tag(self, tag):
with self.lock:
if tag in self.state:
return self.state[tag]
else:
return False if self.get_type(tag) == 'bool' else 0

def set_tag(self, tag, value=None):
with self.lock:
if value is not None:
self.state[tag] = value
else:
if tag in self.state:
return self.state[tag]
else:
return False if self.get_type(tag) == 'bool' else 0

def _subscription_handler(self, message):
"""Receive Subscription message
Expand All @@ -359,10 +381,8 @@ def _subscription_handler(self, message):
sub_source = threading.current_thread().name

for point in points:
if not point:
if not point or len(point) <= 1:
continue
if point == "":
continue

try:
tag = point.split(':')[0]
Expand All @@ -386,21 +406,13 @@ def _subscription_handler(self, message):
field = 'value'

if field == 'value':
if not math.isclose(float(self.tag(full_tag)), value):
self.tag(full_tag, value)
logger.debug("UPDATE NOW: "+full_tag)
logger.debug("New value: "+str(value))
else:
continue
if not math.isclose(float(self.get_tag(full_tag)), value):
self.set_tag(full_tag, value)
logger.debug(f"UPDATE NOW: {full_tag} = {value}")
elif field == 'status':
if self.tag(full_tag) != value:
self.tag(full_tag, value)
logger.debug("UPDATE NOW: "+full_tag)
logger.debug("New value: "+str(value))
else:
continue
else:
continue
if self.get_tag(full_tag) != value:
self.set_tag(full_tag, value)
logger.debug(f"UPDATE NOW: {full_tag} = {value}")

def ctrl_exit_handler(self, signal, frame):
logger.info("SIGINT or CTRL-C detected. Exiting gracefully")
Expand All @@ -423,4 +435,4 @@ def main():
try:
main()
except KeyboardInterrupt:
sys.exit()
sys.exit()

0 comments on commit 6c3a68f

Please sign in to comment.