Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latest and greatest to Alicanto #21

Merged
merged 38 commits into from
Mar 20, 2024
Merged
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
af8baf0
adding alicanto co-provider feature
spitfirejmw Sep 7, 2023
401b22c
initial documentation, will need more
spitfirejmw Sep 7, 2023
7a57706
fix headers
spitfirejmw Sep 12, 2023
5536b67
Merge branch 'main' of https://github.com/sandialabs/sceptre-bennu
spitfirejmw Oct 17, 2023
10d99fc
working on bug
spitfirejmw Nov 1, 2023
0c65709
first shot add logic
spitfirejmw Nov 13, 2023
11d9d7f
add logic
spitfirejmw Nov 22, 2023
8f3214b
fix tag
spitfirejmw Nov 22, 2023
fedb82c
see if threads can be cleaned
spitfirejmw Nov 27, 2023
0e3b886
close sockets
spitfirejmw Nov 27, 2023
1d9aa6d
looks like context term did the trick
spitfirejmw Nov 27, 2023
67af096
wait a sec
spitfirejmw Nov 27, 2023
aa9460b
typo fix
spitfirejmw Nov 27, 2023
0fbfec2
working on closing zmq context
spitfirejmw Nov 28, 2023
c55a654
maybe working
spitfirejmw Nov 28, 2023
275c264
add some error handling
spitfirejmw Nov 29, 2023
b81b757
add a sleep
spitfirejmw Nov 29, 2023
44f882c
move split into try
spitfirejmw Nov 30, 2023
759513f
Merge branch 'main' of https://github.com/sandialabs/sceptre-bennu
spitfirejmw Nov 30, 2023
6c37b0b
get rid of sleeps
spitfirejmw Nov 30, 2023
9d7b297
work on getting seperate exec good
spitfirejmw Nov 30, 2023
8f49f6f
fix logic import
spitfirejmw Nov 30, 2023
ce37292
add parser
spitfirejmw Nov 30, 2023
e371c10
change tag
spitfirejmw Nov 30, 2023
8dad9ee
maybe a fix
spitfirejmw Nov 30, 2023
0c023d5
fix point split not working at start
spitfirejmw Dec 20, 2023
157b97b
fix binary bug
spitfirejmw Jan 16, 2024
aaf711a
fix client.py weirdness
spitfirejmw Jan 16, 2024
d0de9d8
allow user to provide false to client.py
spitfirejmw Jan 16, 2024
82fa4c6
comment out startup sleep
spitfirejmw Jan 17, 2024
a2ff1bb
undo client change
spitfirejmw Jan 19, 2024
9424fec
try out new digiital write
spitfirejmw Jan 19, 2024
b8f063a
actual fix with eval
spitfirejmw Jan 19, 2024
a4cc807
remove unneeded provider files
spitfirejmw Jan 23, 2024
152faa9
fix readme
spitfirejmw Jan 29, 2024
022446c
clean up comments
spitfirejmw Jan 29, 2024
9507a87
latest and greatest alicanto
spitfirejmw Mar 20, 2024
a890172
fix merge
spitfirejmw Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 146 additions & 134 deletions src/pybennu/pybennu/executables/pybennu_alicanto.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ def __init__(self, end_dest):
def send(self, message):
""" Send message to Provider
"""
reply = None
with timeout(10):
self.connect()

#try:

# send update
self._Client__socket.send_string(message+'\0') # must include null byte
# get response
Expand All @@ -77,16 +81,25 @@ def send(self, message):
data = reply[1]

if status == self._Client__kACK:
print("I: ACK: "+data)
logger.info(f"I: ACK -- {data}")
#self.reply_handler(data)
else:
print("I: ERR -- %s" % msg)
logger.error(f"I: ERR -- {msg}")

#finally:
# logger.debug(f"DEBUG: Closing socket and context...")
# self._Client__socket.close()
# self._Client__context.term()
# logger.debug(f"DEBUG: ... Closed socket and context")

try:
self._Client__socket.close()
self._Client__context.term()
except:
pass

return reply

return reply
class alicanto():
def __init__(self, config, debug=False, exit_handler=None):

Expand All @@ -101,16 +114,16 @@ def __init__(self, config, debug=False, exit_handler=None):
self.lock = threading.Lock()

# Initialize system state
self.state = {}
self.state = dict()
# Tag=>destination map
self.dests = {}
self.dests = dict()
# Tag=>type map
self.types = {}
self.logic = {}
self.types = dict()
self.logic = dict()
# Expression parser for logic
self.parser = Parser()
# Set of all tags
self.tags = {}
self.tags = dict()

############## Get counts from json ######################
cfg = None
Expand All @@ -133,7 +146,7 @@ def __init__(self, config, debug=False, exit_handler=None):

# Diagnostics to confirm JSON config correctly added the required
# endpoints and subscriptions
self.endid = {}
self.endid = dict()
self.end_dests = []
for i, endpoint in enumerate(cfg["endpoints"]):
self.endid[i] = endpoint
Expand All @@ -149,7 +162,7 @@ def __init__(self, config, debug=False, exit_handler=None):
# make end_dests elements unique
self.end_dests = list(set(self.end_dests))

self.subid = {}
self.subid = dict()
self.sub_sources = []
for i in range(0, self.sub_count):
self.subid[i] = cfg["subscriptions"][i]
Expand Down Expand Up @@ -185,16 +198,6 @@ def __init__(self, config, debug=False, exit_handler=None):
self.__sub_thread.daemon = True
self.__sub_thread.start()

self.end_clients = {}
for end_dest in self.end_dests:
# Initialize bennu Client
end_dest = end_dest.split('/')[0]
try:
self.end_clients[end_dest] = alicantoClient(end_dest)
except:
logger.error(f"\tError Initializing Client: {self.end_clients}")
for key in list(self.end_clients.keys()):
logger.info(f"End_client: {key}")

def run(self):
############## Entering Execution Mode ##############################
Expand All @@ -212,17 +215,32 @@ def run(self):
end_dest_tag = (full_end_dest.split('/')[1]
if '/' in full_end_dest
else full_end_dest)

logger.info("Reading initial value from client now...")
try:
client = alicantoClient(end_dest)
except:
logger.error(f"\tError Initializing Client: {client}")
continue

try:
reply = client.send("READ="+end_dest_tag)
if not reply:
continue
except:
logger.error(f"\tError Reading remote value")
continue

try:
self.end_clients[end_dest] = alicantoClient(end_dest)
reply = self.end_clients[end_dest].send("READ="+end_dest_tag)
value = reply[1].rstrip('\x00')
self.endid[i]["value"] = value
self.tag(full_end_dest, value)
self.set_tag(full_end_dest, value)
logger.debug(f"Initial Endpoints {end_name} / {end_dest}:{value} ")

except:
logger.error(f"\tError Initializing Client: {self.end_clients}")
logger.error(f"\tError Parsing response from Client")
continue

logger.info("... Client value status initialized!")

########## Main co-simulation loop ####################################
while True:
Expand All @@ -241,108 +259,112 @@ def run(self):
if '/' in full_end_dest
else full_end_dest)

if self.types[full_end_name] == 'float' or self.types[full_end_name] == 'double':
if not math.isclose(float(self.tag(full_end_name)), float(self.tag(full_end_dest))):
#Handle Logic
if self.logic[full_end_dest] is not None:
expr = self.parser.parse(self.logic[full_end_dest])
'''
# Assign variables
vars = {}
for var in expr.variables():
vars[var] = self.tag(var)
'''
i = 0
# Assign vars not working, so assign token manually
for token in expr.tokens:
for search_tag in self.tags:
if token.toString() == search_tag:
expr.tokens[i].number_ = self.tag(token.toString())
i += 1
# Evaluate expression
value = expr.evaluate(vars)
value = str(value).lower()
if value != self.tag(full_end_dest):
logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}")
# Assign new tag value
self.tag(full_end_dest, value)
# Skip if value is unchanged
elif value == self.tag(full_end_dest):
continue

try:
self.end_clients[end_dest] = alicantoClient(end_dest)
if self.logic[full_end_dest] is not None:
self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_dest))
else:
self.end_clients[end_dest].write_analog_point(end_dest_tag, self.tag(full_end_name))
time.sleep(0.5)
reply = self.end_clients[end_dest].send("READ="+end_dest_tag)
value = reply[1].rstrip('\x00')
self.tag(full_end_dest, value)
except:
logger.error(f"\tError Initializing Client: {self.end_clients}")
continue
elif self.types[full_end_name] == 'bool':
if str(self.tag(full_end_name)).lower() != str(self.tag(full_end_dest)).lower():
#Handle Logic
if self.logic[full_end_dest] is not None:
expr = self.parser.parse(self.logic[full_end_dest])
'''
# Assign variables
vars = {}
for var in expr.variables():
vars[var] = self.tag(var)
'''
i = 0
# Assign vars not working, so assign token manually
for token in expr.tokens:
for search_tag in self.tags:
if token.toString() == search_tag:
expr.tokens[i].number_ = bool(self.tag(token.toString()))
i += 1
# Evaluate expression
value = expr.evaluate(vars)
value = str(value)
if value != self.tag(full_end_dest):
logger.debug(f"\tLOGIC: {full_end_dest.strip()}={self.logic[full_end_dest]} ----> {value}")
# Assign new tag value
self.tag(full_end_dest, value)
# Skip if value is unchanged
elif value == self.tag(full_end_dest):
continue
try:
self.end_clients[end_dest] = alicantoClient(end_dest)
if self.logic[full_end_dest] is not None:
self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_dest)))
else:
self.end_clients[end_dest].write_digital_point(end_dest_tag, eval(self.tag(full_end_name)))
time.sleep(0.5)
reply = self.end_clients[end_dest].send("READ="+end_dest_tag)
value = reply[1].rstrip('\x00')
self.tag(full_end_dest, value)
except:
logger.error(f"\tError Initializing Client: {self.end_clients}")
continue
these_types = self.types[full_end_name]

# If we don't need to update the value, escape early
if (these_types == 'float' or these_types == 'double') and (math.isclose(float(self.get_tag(full_end_name)), float(self.get_tag(full_end_dest)))):
#logger.debug(f"SKIP: No need to update {self.get_tag(full_end_name)} vs {self.get_tag(full_end_dest)}")
continue
true_set = (True, 'True', 'true', '1', 1)
if (these_types == 'bool') and ((self.get_tag(full_end_name) in true_set) == (self.get_tag(full_end_dest) in true_set)):
#logger.debug(f"SKIP: No need to update {self.get_tag(full_end_name)} vs {self.get_tag(full_end_dest)}")
continue

# Handle the case where there is logic involved
if self.logic[full_end_dest] is not None:
expr = self.parser.parse(self.logic[full_end_dest])

# Assign variables
these_vars = dict()

'''
for v in expr.variables():
t = self.get_tag(v)
if these_types == 'bool':
t = bool(t)
these_vars[v] = t
'''
# Automatic variable parsing with expression parser not working, so assign token manually
for i,token in enumerate(expr.tokens):
if token.toString() in self.tags:
t = self.get_tag(token.toString())
if these_types == 'bool':
t = bool(t)

expr.tokens[i].number_ = t

# Evaluate expression
value = expr.evaluate(these_vars)
value = str(value)
if value != self.get_tag(full_end_dest):
self.set_tag(full_end_dest, value)
else:
continue

tag_ptr = full_end_dest

else:
tag_ptr = full_end_name


# Send update
logger.info(f"Sending value update...")
try:
client = alicantoClient(end_dest)
except:
logger.error(f"\tError Initializing Client: {client}")
continue

try:
if these_types == 'float' or these_types == 'double':
client.write_analog_point(end_dest_tag, self.get_tag(tag_ptr))
elif these_types == 'bool':
client.write_digital_point(end_dest_tag, eval(self.get_tag(tag_ptr)))
except:
logger.error(f"\tError Writing value to remote Client")

time.sleep(0.5)

try:
reply = client.send("READ="+end_dest_tag)
if not reply:
continue
except:
logger.error(f"\tError Reading remote value")
continue

try:
value = reply[1].rstrip('\x00')
self.set_tag(full_end_dest, value)
except:
logger.error(f"\tError Parsing response from Client")
continue

logger.info(f"... Update sent!")


def publish_state(self):
logger.info("=================== DATA ===================")
for tag in self.tags:
logger.info(f"{tag:<30} --- {self.tag(tag):}")
logger.info(f"{tag:<30} --- {self.get_tag(tag):}")
logger.info("============================================")

def get_type(self, tag):
return self.types[tag]

def tag(self, tag, value=None):
def get_tag(self, tag):
with self.lock:
if tag in self.state:
return self.state[tag]
else:
return False if self.get_type(tag) == 'bool' else 0

def set_tag(self, tag, value=None):
with self.lock:
if value is not None:
self.state[tag] = value
else:
if tag in self.state:
return self.state[tag]
else:
return False if self.get_type(tag) == 'bool' else 0

def _subscription_handler(self, message):
"""Receive Subscription message
Expand All @@ -359,10 +381,8 @@ def _subscription_handler(self, message):
sub_source = threading.current_thread().name

for point in points:
if not point:
if not point or len(point) <= 1:
continue
if point == "":
continue

try:
tag = point.split(':')[0]
Expand All @@ -386,21 +406,13 @@ def _subscription_handler(self, message):
field = 'value'

if field == 'value':
if not math.isclose(float(self.tag(full_tag)), value):
self.tag(full_tag, value)
logger.debug("UPDATE NOW: "+full_tag)
logger.debug("New value: "+str(value))
else:
continue
if not math.isclose(float(self.get_tag(full_tag)), value):
self.set_tag(full_tag, value)
logger.debug(f"UPDATE NOW: {full_tag} = {value}")
elif field == 'status':
if self.tag(full_tag) != value:
self.tag(full_tag, value)
logger.debug("UPDATE NOW: "+full_tag)
logger.debug("New value: "+str(value))
else:
continue
else:
continue
if self.get_tag(full_tag) != value:
self.set_tag(full_tag, value)
logger.debug(f"UPDATE NOW: {full_tag} = {value}")

def ctrl_exit_handler(self, signal, frame):
logger.info("SIGINT or CTRL-C detected. Exiting gracefully")
Expand All @@ -423,4 +435,4 @@ def main():
try:
main()
except KeyboardInterrupt:
sys.exit()
sys.exit()