Skip to content

Commit

Permalink
#488:
Browse files Browse the repository at this point in the history
further work on SDO
  • Loading branch information
ebroecker committed Jan 15, 2025
1 parent 2014565 commit dbda8b4
Showing 1 changed file with 60 additions and 20 deletions.
80 changes: 60 additions & 20 deletions src/canmatrix/formats/eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import canopen.objectdictionary.eds
import canopen.objectdictionary.datatypes
import codecs

import copy

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,8 +51,8 @@ def get_signals(parent_object, signal_receiver):

def get_bit_length(data_type_code):
return datatype_mapping[data_type_code][1]
#def get_data_type_name(data_type_code):
# return datatype_mapping[data_type_code][0]
def get_data_type_name(data_type_code):
return datatype_mapping[data_type_code][0]

def format_index(index, subindex):
return f"Index: 0x{index:04X}{subindex:02X}"
Expand Down Expand Up @@ -93,7 +93,7 @@ def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatri
db.add_frame(emcy)

sdo_down = canmatrix.canmatrix.Frame(name="SDO_download", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x600+node_id), transmitters=[node_name])
sig_cmd = canmatrix.canmatrix.Signal(name="sdo_down_CMD", size=5, start_bit=3, receivers=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="sdo_down_CMD", size=3, start_bit=5, receivers=[plc_name], is_signed=False)
sig_cmd.is_multiplexer = True
sdo_down.is_complex_multiplexed = True
sig_cmd.multiplex = "Multiplexor"
Expand Down Expand Up @@ -122,35 +122,75 @@ def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatri
db.add_frame(sdo_down)

sdo_up = canmatrix.canmatrix.Frame(name="SDO_upload", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x580+node_id), transmitters=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="sdo_state", size=8, start_bit=0, receivers=[node_name])
sig_cmd = canmatrix.canmatrix.Signal(name="sdo_up_CMD", size=3, start_bit=5, is_signed=False)
sig_cmd.is_multiplexer = True
sdo_up.is_complex_multiplexed = True
sig_cmd.multiplex = "Multiplexor"
sdo_up.add_signal(sig_cmd)
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="sdo_uo_IDX", size=16, start_bit=8, receivers=[node_name]))
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="sdo_up_SUBIDX", size=8, start_bit=24, receivers=[node_name]))
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="error_code", size=32, start_bit=32, receivers=[node_name], multiplex=0x80))
sig_cmd.add_values(0x80, "upload_error")

sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data8", size=8, start_bit=32, receivers=[plc_name], multiplex=0x4f))
sig_cmd.add_values(0x2f, "8_bytes")
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data16", size=16, start_bit=32, receivers=[plc_name], multiplex=0x4b))
sig_cmd.add_values(0x2b, "16_bytes")
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data24", size=24, start_bit=32, receivers=[plc_name], multiplex=0x47))
sig_cmd.add_values(0x27, "3_bytes")
# sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data32", size=32, start_bit=32, receivers=[plc_name], multiplex=0x43))
sig_cmd.add_values(0x23, "4_bytes")

index = canmatrix.canmatrix.Signal(name="sdo_up_IDX", size=24, start_bit=8)
index.multiplex = "Multiplexor"
index.is_multiplexer = True
index.mux_val = 2
index.mux_val_grp.append([ 2, 2])
index.muxer_for_signal = "sdo_up_CMD"
sdo_up.add_signal(index)

db.add_frame(sdo_up)



for obj in od.values():
if isinstance(obj, canopen.objectdictionary.ODVariable):
subindex = 0
combined_value = int(f"{subindex:02X}{obj.index:04X}", 16)
new_sig = canmatrix.canmatrix.Signal(name=obj.name.replace(' ', '_'), size=get_bit_length(obj.data_type), start_bit=32, receivers=[plc_name])

signal_name = obj.name.replace(' ', '_').replace('-','_')
new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=get_bit_length(obj.data_type), start_bit=32, receivers=[plc_name])
datatype_name = get_data_type_name(obj.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.mux_val = combined_value
new_sig.mux_val_grp.append([ combined_value, combined_value])
new_sig.muxer_for_signal = "sdo_down_IDX"
sdo_down.add_signal(new_sig)
up_sig = copy.deepcopy(new_sig)

up_sig.receivers = []
sdo_up.add_signal(up_sig)
elif isinstance(obj, canopen.objectdictionary.ODRecord):
for subobj in obj.values():
combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16)
signal_name = subobj.name.replace(' ', '_').replace('-','_')
new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=get_bit_length(subobj.data_type), start_bit=32, receivers=[plc_name])
datatype_name = get_data_type_name(subobj.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.mux_val = combined_value
new_sig.mux_val_grp.append([ combined_value, combined_value])
new_sig.muxer_for_signal = "sdo_down_IDX"
sdo_down.add_signal(new_sig)
up_sig = copy.deepcopy(new_sig)

up_sig.receivers = []
sdo_up.add_signal(up_sig)
elif isinstance(obj, canopen.objectdictionary.ODArray):
for subobj in obj.values():
combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16)
signal_name = subobj.name.replace(' ', '_').replace('-','_')

new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=get_bit_length(subobj.data_type), start_bit=32, receivers=[plc_name])
datatype_name = get_data_type_name(subobj.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.mux_val = combined_value
new_sig.mux_val_grp.append([ combined_value, combined_value])
new_sig.muxer_for_signal = "sdo_down_IDX"
sdo_down.add_signal(new_sig)
up_sig = copy.deepcopy(new_sig)

up_sig.receivers = []
sdo_up.add_signal(up_sig)



# RX Can-Ids ...
Expand Down

0 comments on commit dbda8b4

Please sign in to comment.