diff --git a/decoders/avclan/__init__.py b/decoders/avclan/__init__.py new file mode 100644 index 00000000..7eb5a88d --- /dev/null +++ b/decoders/avclan/__init__.py @@ -0,0 +1,24 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2023 Maciej Grela +## +## This program is free software: you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program. If not, see . + + +''' +AVC-LAN is an IEBus variant used for multimedia communications inside Toyota vehicles +''' + +from .pd import Decoder diff --git a/decoders/avclan/lists.py b/decoders/avclan/lists.py new file mode 100644 index 00000000..6b6d3196 --- /dev/null +++ b/decoders/avclan/lists.py @@ -0,0 +1,291 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2023 Maciej Grela +## +## This program is free software: you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program. If not, see . + +''' +Lists of known values used by the AVC-LAN protocol in Toyota vehicles. + +The values here have been either copied from other reference sources, +and existing code: +- http://softservice.com.pl/corolla/avc/avclan.php +- https://github.com/halleysfifthinc/Toyota-AVC-LAN/ + +or reverse-engineered using a Head Unit and CD changer documented in detail on + +https://pop.fsck.pl/hardware/toyota-corolla.html +''' + +from enum import IntEnum, IntFlag + + +class Searchable: # pylint: disable=too-few-public-methods + '''A trait implementing value search in enums.''' + @classmethod + def has_value(cls, value): + '''Searches for a particular integer value in the enum''' + return value in iter(cls) + + +class Commands(Searchable, IntEnum): + ''' + Valid values for the control bits if IEBus frames. + + Reference: https://en.wikipedia.org/wiki/IEBus + Reference: http://softservice.com.pl/corolla/avc/avclan.php + ''' + READ_STATUS = 0x00 # Reads slave status + READ_DATA_LOCK = 0x03 # Reads data and locks unit + READ_LOCK_ADDR_LO = 0x04 # Reads lock address (lower 8 bits) + READ_LOCK_ADDR_HI = 0x05 # Reads lock address (higher 4 bits) + READ_STATUS_UNLOCK = 0x06 # Reads slave status and unlocks unit + READ_DATA = 0x07 # Reads data + WRITE_CMD_LOCK = 0x0a # Writes command and locks unit + WRITE_DATA_LOCK = 0x0b # Writes data and locks unit + WRITE_CMD = 0x0e # Writes command + WRITE_DATA = 0x0f # Writes data + + +class HWAddresses(Searchable, IntEnum): + ''' + Known hardware addresses. + ''' + EMV = 0x110 + AVX = 0x120 + DIN1_TV = 0x128 # 1DIN TV + AVN = 0x140 + G_BOOK = 0x144 # G-BOOK + AUDIO_HU1 = 0x160 # AUDIO H/U: Control Panel subassy + NAVI = 0x178 + MONET = 0x17C + TEL = 0x17D + Rr_TV = 0x180 # Rr-TV (sic!) pylint: disable=invalid-name + AUDIO_HU2 = 0x190 # AUDIO H/U: CD Player + Tuner + Audio Amplifier subassy + DVD_P = 0x1A0 + CLOCK = 0x1D6 + CAMERA_C = 0x1AC # CAMERA-C + Rr_CONT = 0x1C0 # Rr-CONT (sic!) pylint: disable=invalid-name + TV_TUNER2 = 0x1C2 # TV-TUNER2 + PANEL = 0x1C4 + GW = 0x1C6 # G/W + FM_M_LCD = 0x1C8 # FM-M-LCD + ST_WHEEL_CTRL = 0x1CC + GW_TRIP = 0x1D8 # G/W for Trip + BODY = 0x1EC + RADIO_TUNER = 0x1F0 + XM = 0x1F1 + SIRIUS = 0x1F2 + RSA = 0x1F4 + RSE = 0x1F6 + GROUP_AUDIO = 0x1FF # Group 1 - All Audio devices + TV_TUNER = 0x230 + CD_CH2 = 0x240 + DVD_CH = 0x250 + CAMERA = 0x280 + CD_CH1 = 0x360 + MD_CH = 0x3A0 + DSP_AMP = 0x440 + AMP = 0x480 + ETC = 0x530 + MAYDAY = 0x5C8 + BROADCAST = 0xFFF # General Broadcast (All devices on bus) + + +class FunctionIDs(Searchable, IntEnum): + ''' + These are called "Logical Addresses" in the Softservice site + but I think a better term would be "functions" + Reference: http://softservice.com.pl/corolla/avc/avclan.php + ''' + COMM_CTRL = 0x01 # communication ctrl + COMMUNICATION = 0x12 # communication + SW = 0x21 + SW_NAME = 0x23 # SW with name + SW_CONVERTING = 0x24 + CMD_SW = 0x25 # command SW + BEEP_HU = 0x28 # beep dev in HU + BEEP_SPEAKERS = 0x29 # beep via speakers + FRONT_PSNG_MONITOR = 0x34 # front passenger monitor + CD_CHANGER2 = 0x43 # Reported by CD_CH2 (0x240) + BLUETOOTH_TEL = 0x55 + INFO_DRAWING = 0x56 # information drawing + NAV_ECU = 0x58 # navigation ECU + CAMERA = 0x5C + CLIMATE_DRAWING = 0x5D # Climate ctrl drawing + AUDIO_DRAWING = 0x5E + TRIP_INFO_DRAWING = 0x5F + TUNER = 0x60 + TAPE_DECK = 0x61 + CD = 0x62 + CD_CHANGER = 0x63 # Reported by CD_CH1 (0x360) + AUDIO_AMP = 0x74 # Audio amplifier + GPS = 0x80 # GPS receiver + VOICE_CTRL = 0x85 # voice control + CLIMATE_CTRL_DEV = 0xE0 # climate ctrl dev + TRIP_INFO = 0xE5 + + +class CommCtrlOpcodes(Searchable, IntEnum): + ''' + Opcodes for the COMM_CTRL function. + + It looks like response opcode = request opcode + 0x10 but + this rule doesn't match all the opcodes seen. Specifically + the 0x45 code does not match. + ''' + LIST_FUNCTIONS_REQ = 0x00 + LIST_FUNCTIONS_RESP = 0x10 + RESTART_LAN = 0x01 + LANCHECK_END_REQ = 0x08 + LANCHECK_END_RESP = 0x18 + LANCHECK_SCAN_REQ = 0x0a + LANCHECK_SCAN_RESP = 0x1a + LANCHECK_REQ = 0x0c + LANCHECK_RESP = 0x1c + PING_REQ = 0x20 + PING_RESP = 0x30 + + # Used when HU is switching between Radio and CD + DISABLE_FUNCTION_REQ = 0x43 + DISABLE_FUNCTION_RESP = 0x53 + ENABLE_FUNCTION_REQ = 0x42 + ENABLE_FUNCTION_RESP = 0x52 + + ADVERTISE_FUNCTION = 0x45 # xx=60,61,62,63... function + GENERAL_QUERY = 0x46 # any device is use + + +class CDOpcodes(Searchable, IntEnum): + ''' + Opcodes for the CD Player function. + These seem to be also common for the CD Changer function. + ''' + # Events + INSERTED_CD = 0x50 + REMOVED_CD = 0x51 + + # Requests + REQUEST_PLAYBACK2 = 0xe2 + REQUEST_LOADER2 = 0xe4 + REQUEST_TRACK_NAME = 0xed + + # Reports + REPORT_PLAYBACK = 0xf1 + REPORT_PLAYBACK2 = 0xf2 + REPORT_LOADER = 0xf3 + REPORT_LOADER2 = 0xf4 # Requested with REQUEST_LOADER + REPORT_TOC = 0xf9 + REPORT_TRACK_NAME = 0xfd + + +class CDSlots(Searchable, IntFlag): + SLOT1 = 0x01 + SLOT2 = 0x02 + SLOT3 = 0x04 + SLOT4 = 0x08 + SLOT5 = 0x10 + SLOT6 = 0x20 + + +class CDStateCodes(Searchable, IntFlag): + '''State codes for the CD Player function.''' + OPEN = 0x01 + ERR1 = 0x02 + BIT2 = 0x04 + SEEKING = 0x08 + PLAYBACK = 0x10 + SEEKING_TRACK = 0x20 + BIT6 = 0x40 + LOADING = 0x80 + + +class CDFlags(Searchable, IntFlag): + '''Bit flags for the CD Player function.''' + BIT0 = 0x01 + DISK_RANDOM = 0x02 + RANDOM = 0x04 + DISK_REPEAT = 0x08 + REPEAT = 0x10 + DISK_SCAN = 0x20 + SCAN = 0x40 + BIT7 = 0x80 + + +class CmdSwOpcodes(Searchable, IntEnum): + '''Opcodes for the SW_CMD function.''' + EJECT = 0x80 + DISC_UP = 0x90 + DISC_DOWN = 0x91 + PWRVOL_KNOB_RIGHTHAND_TURN = 0x9c + PWRVOL_KNOB_LEFTHAND_TURN = 0x9d + TRACK_SEEK_UP = 0x94 + TRACK_SEEK_DOWN = 0x95 + CD_ENABLE_SCAN = 0xa6 + CD_DISABLE_SCAN = 0xa7 + CD_ENABLE_REPEAT = 0xa0 + CD_DISABLE_REPEAT = 0xa1 + CD_ENABLE_RANDOM = 0xb0 + CD_DISABLE_RANDOM = 0xb1 + + +class AudioAmpOpcodes(Searchable, IntEnum): + '''Opcodes for the AUDIO_AMP function.''' + REPORT = 0xf1 + + +class AudioAmpFlags(Searchable, IntFlag): + '''Flags for the AUDIO_AMP functions.''' + BIT0 = 0x01 + BIT1 = 0x02 + MUTE = 0x04 + BIT3 = 0x08 + BIT4 = 0x10 + BIT5 = 0x20 + BIT6 = 0x40 + BIT7 = 0x80 + + +class TunerOpcodes(Searchable, IntEnum): + '''Opcodes for the TUNER function.''' + REPORT = 0xf1 + + +class TunerFlags(Searchable, IntFlag): + '''Bit flags for the TUNER function.''' + BIT0 = 0x01 + BIT1 = 0x02 + TP = 0x04 + TA = 0x08 + REG = 0x10 + BIT5 = 0x20 + AF = 0x40 + BIT7 = 0x80 + + +class TunerState(Searchable, IntEnum): + '''States for the TUNER function.''' + ON = 0x01 + OFF = 0x00 + + +class TunerModes(Searchable, IntEnum): + '''Modes for the TUNER function.''' + MANUAL = 0x27 + AST_SEARCH = 0x0a + SCAN_DOWN = 0x07 + SCAN_UP = 0x06 + READY = 0x01 + OFF = 0x00 diff --git a/decoders/avclan/pd.py b/decoders/avclan/pd.py new file mode 100644 index 00000000..86b57263 --- /dev/null +++ b/decoders/avclan/pd.py @@ -0,0 +1,683 @@ +## +# This file is part of the libsigrokdecode project. +## +# Copyright (C) 2023 Maciej Grela +## +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +## +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +## +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# pylint: disable=missing-module-docstring + +from collections import namedtuple +import sigrokdecode as srd # pylint: disable=import-error +from .lists import * # pylint: disable=wildcard-import,unused-wildcard-import + +DataByte = namedtuple('DataByte', ['b', 'ss', 'es']) + + +# Reference: https://docs.python.org/3/library/itertools.html +def first_true(iterable, default=None, pred=None): + """Returns the first true value in the iterable. + + If no true value is found, returns *default* + + If *pred* is not None, returns the first item + for which pred(item) is true. + + """ + # first_true([a,b,c], x) --> a or b or c or x + # first_true([a,b], x, f) --> a if f(a) else b if f(b) else x + return next(filter(pred, iterable), default) + + +class Decoder(srd.Decoder): # pylint: disable=too-many-instance-attributes + '''AVC-LAN Decoder class used by libsigrokdecode.''' + + api_version = 3 + id = 'avclan' + name = 'AVC-LAN' + longname = 'AVC-LAN Toyota Audio-Video Local Area Network' + desc = 'AVC-LAN Protocol Decoder (IEBus Mode 2 variant)' + license = 'gplv3+' + inputs = ['iebus'] + outputs = [] + tags = ['Automotive'] + channels = () + options = () + annotations = ( + ('address', 'Device Address'), # 0 + ('function', 'Function'), # 1 + + # Control Protocol + ('ctrl-opcode', 'Opcode'), # 2 + ('sequence-no', 'Sequence No.'), # 3 + ('advertised-function', 'Function'), # 4 + + # HU Commands + ('cmd-opcode', 'Opcode'), # 5 + + # CD Player + ('cd-opcode', 'Opcode'), # 6 + ('cd-state', 'State'), # 7 + ('cd-flags', 'Flags'), # 8 + ('disc-number', 'Disc Number'), # 9 + ('track-number', 'Track Number'), # 10 + ('track-count', 'Track Count'), # 11 + ('disc-title', 'Disc Name'), # 12 + ('track-title', 'Track Name'), # 13 + ('playback-time', 'Playback time'), # 14 + ('disc-slots', 'Disc Slots'), # 15 + + # Audio AMP + ('audio-opcode', 'Opcode'), # 16 + ('audio-flags', 'Audio Flags'), # 17 + ('volume', 'Volume'), # 18 + ('bass', 'Bass'), # 19 + ('treble', 'Treble'), # 20 + ('fade', 'Fade'), # 21 + ('balance', 'Balance'), # 22 + + # TUNER (radio) + ('radio-opcode', 'Opcode'), # 23 + ('radio-state', 'State'), # 24 + ('radio-mode', 'Mode'), # 25 + ('radio-flags', 'Flags'), # 26 + ('band', 'Band'), # 27 + ('channel', 'Channel'), # 28 + ('freq', 'Frequency'), # 29 + + ('warning', 'Warning') + ) + + annotation_rows = ( + ('devices', 'Device Addresses and Functions', (0, 1)), + ('control', 'Network Control', (2, 3, 4)), + ('cmd', 'HU Commands', (5,)), + ('cd', 'CD Player', (6, 7, 8, 9, 10, 11, 12, 13, 14, 15)), + ('audio', 'Audio Amplifier', (16, 17, 18, 19, 20, 21, 22)), + ('radio', 'Radio Tuner', (23, 24, 25, 26, 27, 28, 29)), + ('warnings', 'Warnings', (30,)) + ) + + + def __init__(self): + # Make pylint happy (attribute-defined-outside-init checker) + self.state = None + self.broadcast_bit = None + self.master_addr = self.slave_addr = None + self.control = None + self.data_length = self.data_bytes = None + self.from_function = self.to_function = None + self.samplerate = None + self.out_ann = None + + self.reset() + + + def reset(self): + '''Reset decoder state.''' + self.state = 'IDLE' + self.broadcast_bit = None + self.master_addr = self.slave_addr = None + self.control = None + self.data_length = self.data_bytes = None + self.from_function = self.to_function = None + + + def start(self): + '''Start decoder.''' + self.out_ann = self.register(srd.OUTPUT_ANN) + + + def metadata(self, key, value): + '''Handle metadata input from libsigrokdecode.''' + if key == srd.SRD_CONF_SAMPLERATE: + self.samplerate = value + + + def find_annotation(self, anno_name: str): + '''Find an annotation index based on its name.''' + return first_true(enumerate(self.annotations), default=(None, (None, None)), + pred=lambda item: item[1][0] == anno_name) + + + def putx(self, anno_name: str, ss, es, v): + '''Put in an annotation using its name.''' + (idx, _) = self.find_annotation(anno_name) + if idx is None: + raise RuntimeError(f'Cannot find annotation name {anno_name}') + self.put(ss, es, self.out_ann, [idx, v]) + + + def pkt_from_12(self): + '''Handle frames from function 12(COMMUNICATION)''' + self.pkt_comm_ctrl() + + + def pkt_to_12(self): + '''Handle frames to function 12(COMMUNICATION)''' + self.pkt_comm_ctrl() + + + def pkt_to_01(self): + '''Handle frames to function 01(COMM_CTRL).''' + self.pkt_comm_ctrl() + + + def pkt_from_01(self): + '''Handle frames from function 01(COMM_CTRL).''' + self.pkt_comm_ctrl() + + + def pkt_comm_ctrl(self): + '''Handle control protocol frames.''' + opcode = self.data_bytes[0].b + opcode_anno = f'{opcode:02x}' + + if CommCtrlOpcodes.has_value(opcode): + opcode = CommCtrlOpcodes(opcode) + + self.putx('ctrl-opcode', self.data_bytes[0].ss, self.data_bytes[0].es, + [f'Opcode: {opcode.name}', opcode.name] + ) + + if opcode == CommCtrlOpcodes.ADVERTISE_FUNCTION: + logic_id = self.data_bytes[1].b + if FunctionIDs.has_value(logic_id): + logic_id = FunctionIDs(logic_id) + self.putx('advertised-function', + self.data_bytes[1].ss, self.data_bytes[1].es, + [f'Function: {logic_id.name}', logic_id.name] + ) + + elif opcode == CommCtrlOpcodes.PING_REQ: + sequence = self.data_bytes[1].b + self.putx('sequence-no', self.data_bytes[1].ss, self.data_bytes[1].es, + [f'Sequence Number: {sequence}', str(sequence), 'Seq']) + + elif opcode == CommCtrlOpcodes.PING_RESP: + sequence = self.data_bytes[1].b + self.putx('sequence-no', self.data_bytes[1].ss, self.data_bytes[1].es, + [f'Sequence Number: {sequence}', str(sequence), 'Seq']) + + elif opcode == CommCtrlOpcodes.LIST_FUNCTIONS_RESP: + for (idx, logical_addr) in enumerate([d.b for d in self.data_bytes[1:]], 1): + anno = f'{logical_addr:02x}' + if FunctionIDs.has_value(logical_addr): + logical_addr = FunctionIDs(logical_addr) + anno = logical_addr.name + + self.putx('advertised-function', + self.data_bytes[idx].ss, self.data_bytes[idx].es, + [f'Function: {anno}', anno, 'Func']) + return True + + return False + + + def pkt_from_25(self): + '''Handle frames from function 25(CMD_SW).''' + opcode = self.data_bytes[0].b + if CmdSwOpcodes.has_value(opcode): + opcode = CmdSwOpcodes(opcode) + + self.putx('cmd-opcode', self.data_bytes[0].ss, self.data_bytes[0].es, + [f'Opcode: {opcode.name}', opcode.name] + ) + return False + + + def pkt_from_60(self): + '''Handle frames from function 60(TUNER).''' + opcode = self.data_bytes[0].b + + if TunerOpcodes.has_value(opcode): + opcode = TunerOpcodes(opcode) + + self.putx('radio-opcode', self.data_bytes[0].ss, self.data_bytes[0].es, + [f'Opcode: {opcode.name}', opcode.name] + ) + + if opcode == TunerOpcodes.REPORT: + tuner_state = self.data_bytes[1].b + if TunerState.has_value(tuner_state): + tuner_state = TunerState(tuner_state) + + self.putx('radio-state', self.data_bytes[1].ss, self.data_bytes[1].es, + [f'State: {tuner_state.name}', tuner_state.name]) + + tuner_mode = self.data_bytes[2].b + if TunerModes.has_value(tuner_mode): + tuner_mode = TunerModes(tuner_mode) + + self.putx('radio-mode', self.data_bytes[2].ss, self.data_bytes[2].es, + [f'Mode: {tuner_mode.name}', tuner_mode.name, 'Mode']) + + # Did not know how to properly implement this with Python enums + band_type = self.data_bytes[3].b & 0xF0 + band_number = self.data_bytes[3].b & 0x0F + if band_type == 0x80: + band_type = 'FM' + freq_start = 87.5 + freq_step = 0.05 + freq_unit = 'MHz' + elif band_type == 0xC0: + # Long Wave broadcast band + band_type = 'AM' + freq_start = 153 + freq_step = 1 + freq_unit = 'kHz' + elif band_type == 0x00: + # Medium Wave broadcast band + band_type = 'AM' + freq_start = 522 + freq_step = 9 + freq_unit = 'kHz' + + self.putx('band', self.data_bytes[3].ss, self.data_bytes[3].es, + [f'Band: {band_type} {band_number}', + f'{band_type} {band_number}', 'Band']) + + # Frequency is 2 bytes big-endian + freq = 256 * self.data_bytes[4].b + self.data_bytes[5].b + freq = freq_start + (freq-1) * freq_step + self.putx('freq', self.data_bytes[4].ss, self.data_bytes[5].es, + [f'Freq: {freq} {freq_unit}', f'{freq} {freq_unit}', 'Freq']) + + channel = self.data_bytes[6].b + if channel > 0: + self.putx('channel', self.data_bytes[6].ss, self.data_bytes[6].es, + [f'CH #{channel}', 'CH']) + + flags1 = TunerFlags(self.data_bytes[7].b) + self.putx('radio-flags', self.data_bytes[7].ss, self.data_bytes[7].es, + [f'Flags: {str(flags1)}', 'Flags', 'F']) + flags2 = TunerFlags(self.data_bytes[8].b) + self.putx('radio-flags', self.data_bytes[8].ss, self.data_bytes[8].es, + [f'Flags: {str(flags2)}', 'Flags', 'F']) + + return True + + return False + + + def bcd2dec(self, b: int): + '''Convert a BCD encoded byte to a decimal integer.''' + return 10 * ((b & 0xF0) >> 4) + (b & 0x0F) + + + def pkt_from_62(self): + '''Handle frames from function 62(CD).''' + self.pkt_from_cd_player() + + + def pkt_to_62(self): + self.pkt_to_cd_player() + + + def pkt_from_63(self): + '''Handle frames from function 63(CD_CHANGER).''' + self.pkt_from_cd_player() + + + def pkt_from_43(self): + '''Handle frames from function 43(CD_CHANGER2)''' + self.pkt_from_cd_player() + + + def pkt_to_cd_player(self): + '''Handle frames to a CD player.''' + opcode = self.data_bytes[0].b + opcode_anno = f'{opcode:02x}' + ret = False + + if CDOpcodes.has_value(opcode): + opcode = CDOpcodes(opcode) + opcode_anno = opcode.name + + if opcode == CDOpcodes.REQUEST_TRACK_NAME: + + # This is always 0x01 for builtin CD player + disc_number = self.data_bytes[1].b + anno = [ 'CD #', 'CD', 'C' ] + if disc_number != 0xff: + anno.insert(0, f'CD #{disc_number}') + + self.putx('disc-number', self.data_bytes[1].ss, self.data_bytes[1].es, anno) + + track_number = self.data_bytes[2].b + anno = [ 'Track #', 'Tra', 'T' ] + if track_number != 0xff: + anno.insert(0, f'Track #{track_number}') + + self.putx('track-number', self.data_bytes[2].ss, self.data_bytes[2].es, anno) + + ret = True + + self.putx('cd-opcode', self.data_bytes[0].ss, self.data_bytes[0].es, + [f'Opcode: {opcode_anno}', opcode_anno, 'Opcode']) + return ret + + + def pkt_from_cd_player(self): + '''Handle frames from a CD player.''' + opcode = self.data_bytes[0].b + opcode_anno = f'{opcode:02x}' + ret = False + + if CDOpcodes.has_value(opcode): + opcode = CDOpcodes(opcode) + opcode_anno = opcode.name + + if opcode == CDOpcodes.REPORT_PLAYBACK or opcode == CDOpcodes.REPORT_PLAYBACK2: + cd_state = self.data_bytes[2].b + + anno = str(CDStateCodes(cd_state)) + self.putx('cd-state', self.data_bytes[2].ss, self.data_bytes[2].es, + [f'State: {anno}', anno, 'State']) + + # This is always 0x01 for builtin CD player + disc_number = self.data_bytes[3].b + anno = [ 'CD #', 'CD', 'C' ] + if disc_number != 0xff: + anno.insert(0, f'CD #{disc_number}') + + self.putx('disc-number', self.data_bytes[3].ss, self.data_bytes[3].es, anno) + + track_number = self.data_bytes[4].b + anno = [ 'Track #', 'Tra', 'T' ] + if track_number != 0xff: + anno.insert(0, f'Track #{track_number}') + + self.putx('track-number', self.data_bytes[4].ss, self.data_bytes[4].es, anno) + + minutes = self.data_bytes[5].b + seconds = self.data_bytes[6].b + anno = ['Time', 'T'] + if minutes != 0xff and seconds != 0xff: + minutes = self.bcd2dec(minutes) + seconds = self.bcd2dec(seconds) + anno.insert(0, f'{minutes:02d}:{seconds:02d}') + anno.insert(0, f'Time: {minutes:02d}:{seconds:02d}') + + self.putx('playback-time', self.data_bytes[5].ss, self.data_bytes[6].es, anno) + + cd_flags = self.data_bytes[7].b + anno = str(CDFlags(cd_flags)) + self.putx('cd-flags', self.data_bytes[7].ss, self.data_bytes[7].es, + [f'Flags: {anno}', anno, 'Flags']) + + elif opcode == CDOpcodes.REPORT_TRACK_NAME: + disc_number = self.data_bytes[1].b + track_number = self.data_bytes[2].b + text = ''.join([chr(d.b) for d in self.data_bytes[5:]]) + if disc_number != 0xff: + self.putx('disc-number', self.data_bytes[1].ss, self.data_bytes[1].es, + [f'CD #{disc_number}', 'CD #']) + self.putx('track-number', self.data_bytes[2].ss, self.data_bytes[2].es, + [f'Track #{track_number}', 'Track #']) + self.putx('track-title', self.data_bytes[5].ss, self.data_bytes[-1].es, + [f'Title: {text}', 'Title']) + + elif opcode == CDOpcodes.REPORT_LOADER or opcode == CDOpcodes.REPORT_LOADER2: + + slots = self.data_bytes[2].b + anno = str(CDSlots(slots)) + self.putx('disc-slots', self.data_bytes[2].ss, self.data_bytes[2].es, + [f'Available: {anno}', anno, 'Avail']) + + slots = self.data_bytes[4].b + anno = str(CDSlots(slots)) + self.putx('disc-slots', self.data_bytes[4].ss, self.data_bytes[4].es, + [f'Disc Present: {anno}', anno, 'Pres']) + + slots = self.data_bytes[6].b + anno = str(CDSlots(slots)) + self.putx('disc-slots', self.data_bytes[6].ss, self.data_bytes[6].es, + [f'Slot-3: {anno}', anno, 'Pres']) + + elif opcode == CDOpcodes.REPORT_TOC: + disc_number = self.data_bytes[1].b + anno = [ 'CD #', 'CD', 'C' ] + if disc_number != 0xff: + anno.insert(0, f'CD #{disc_number}') + + self.putx('disc-number', self.data_bytes[1].ss, self.data_bytes[1].es, anno) + + track_number = self.data_bytes[2].b + anno = [ 'Track #', 'Tra', 'T' ] + if track_number != 0xff: + anno.insert(0, f'Track #{track_number}') + + self.putx('track-number', self.data_bytes[2].ss, self.data_bytes[2].es, anno) + + track_count = self.data_bytes[3].b + anno = [ 'Track Count', 'Count', 'Cnt' ] + if track_count != 0xff: + anno.insert(0, f'Track Count: {track_count}') + + self.putx('track-count', self.data_bytes[3].ss, self.data_bytes[3].es, anno) + + minutes = self.data_bytes[4].b + seconds = self.data_bytes[5].b + anno = ['Total Time', 'T'] + if minutes != 0xff and seconds != 0xff: + minutes = self.bcd2dec(minutes) + seconds = self.bcd2dec(seconds) + anno.insert(0, f'{minutes:02d}:{seconds:02d}') + anno.insert(0, f'Total Time: {minutes:02d}:{seconds:02d}') + + self.putx('playback-time', self.data_bytes[4].ss, self.data_bytes[5].es, anno) + + ret = True + + self.putx('cd-opcode', self.data_bytes[0].ss, self.data_bytes[0].es, + [f'Opcode: {opcode_anno}', opcode_anno, 'Opcode']) + return ret + + + def map_left_right(self, value: int, center: int, + negative_tag: str = '-', positive_tag: str = '+'): + ''' + Map values corresponding to left/right or front/back settings to strings. + Used for balance, fade and so on. + ''' + value -= center + if value < 0: + return f'{negative_tag}{abs(value)}' + if value > 0: + return f'{positive_tag}{abs(value)}' + return '0' + + + def pkt_74(self): + '''Handle frames to/from function 74(AUDIO_AMP)''' + opcode = self.data_bytes[0].b + + if AudioAmpOpcodes.has_value(opcode): + opcode = AudioAmpOpcodes(opcode) + + self.putx('audio-opcode', self.data_bytes[0].ss, self.data_bytes[0].es, + [f'Opcode: {opcode.name}', opcode.name] + ) + + if opcode == AudioAmpOpcodes.REPORT: + # First byte is always 0x80 in volume reports + volume = self.data_bytes[2].b + self.putx('volume', self.data_bytes[2].ss, self.data_bytes[2].es, + [f'Volume: {volume}', 'Volume', 'Vol']) + + balance = self.map_left_right( + self.data_bytes[3].b, 0x10, negative_tag='L', positive_tag='R') + self.putx('balance', self.data_bytes[3].ss, self.data_bytes[3].es, + [f'Balance: {balance}', 'Balance', 'Bal']) + + fade = self.map_left_right( + self.data_bytes[4].b, 0x10, negative_tag='F', positive_tag='R') + self.putx('fade', self.data_bytes[4].ss, self.data_bytes[4].es, + [f'Fade: {fade}', 'Fade']) + + bass = self.map_left_right(self.data_bytes[5].b, 0x10) + self.putx('bass', self.data_bytes[5].ss, self.data_bytes[5].es, + [f'Bass: {bass}', 'Bass']) + + treble = self.map_left_right(self.data_bytes[7].b, 0x10) + self.putx('treble', self.data_bytes[7].ss, self.data_bytes[7].es, + [f'Treble: {treble}', 'Treble']) + + flags = AudioAmpFlags(self.data_bytes[12].b) + self.putx('audio-flags', self.data_bytes[12].ss, self.data_bytes[12].es, + [f'Flags: {str(flags)}', 'Flags']) + + return True + + return False + + + def decode(self, ss, es, data): + '''Decode Python output data from low-level (iebus) decoder.''' + + (ptype, pdata) = data + + if ptype == 'NAK': + # A NAK condition has been observed, bus is reset back to IDLE + # + self.reset() + + if self.state == 'IDLE' and ptype == 'HEADER': + self.broadcast_bit = pdata + self.state = 'MASTER ADDRESS' + elif self.state == 'MASTER ADDRESS' and ptype == 'MASTER ADDRESS': + (address, parity_bit) = pdata + self.master_addr = address + if HWAddresses.has_value(self.master_addr): + self.putx('address', ss, es, [ + HWAddresses(self.master_addr).name]) + + self.state = 'SLAVE ADDRESS' + + elif self.state == 'SLAVE ADDRESS' and ptype == 'SLAVE ADDRESS': + (address, parity_bit, ack_bit) = pdata + self.slave_addr = address + + if HWAddresses.has_value(self.slave_addr): + self.putx('address', ss, es, [ + HWAddresses(self.slave_addr).name]) + + self.state = 'CONTROL' + + elif self.state == 'CONTROL' and ptype == 'CONTROL': + (control, parity_bit, ack_bit) = pdata + self.control = control + + self.state = 'DATA LENGTH' + elif self.state == 'DATA LENGTH' and ptype == 'DATA LENGTH': + (data_length, parity_bit, ack_bit) = pdata + + self.data_length = data_length + + self.state = 'DATA' + elif self.state == 'DATA' and ptype == 'DATA': + + # Ignore parity bit and ack bit for simplicity + self.data_bytes = [DataByte(b=b, ss=ss, es=es) + for (b, parity_bit, ack_bit, ss, es) in pdata] + + # + # Decode logical device IDs + # + if self.broadcast_bit == 1: + # Unicast packets + + # Some packets do not seem to follow this format + if len(self.data_bytes) >= 3: + + # In unicast communications the meaning of the first + # data byte is unknown. Values seen so far are: + # - 0x00 + # - 0xff + # + self.from_function = self.data_bytes[1].b + self.to_function = self.data_bytes[2].b + + from_anno = ['From Function', 'From'] + if FunctionIDs.has_value(self.from_function): + self.from_function = FunctionIDs(self.from_function) + from_anno.insert( + 0, f'From Function: {self.from_function.name}') + self.putx( + 'function', self.data_bytes[1].ss, self.data_bytes[1].es, from_anno) + + to_anno = ['To Function', 'To'] + if FunctionIDs.has_value(self.to_function): + self.to_function = FunctionIDs(self.to_function) + to_anno.insert( + 0, f'To Function: {self.to_function.name}') + self.putx( + 'function', self.data_bytes[2].ss, self.data_bytes[2].es, to_anno) + + self.data_bytes = self.data_bytes[3:] + + elif self.broadcast_bit == 0: + # Broadcast packets + + self.from_function = self.data_bytes[0].b + self.to_function = self.data_bytes[1].b + + from_anno = ['From Function', 'From'] + if FunctionIDs.has_value(self.from_function): + self.from_function = FunctionIDs(self.from_function) + from_anno.insert( + 0, f'From Function: {self.from_function.name}') + self.putx( + 'function', self.data_bytes[0].ss, self.data_bytes[0].es, from_anno) + + to_anno = ['To Function', 'To'] + if FunctionIDs.has_value(self.to_function): + self.to_function = FunctionIDs(self.to_function) + to_anno.insert(0, f'To Function: {self.to_function.name}') + self.putx( + 'function', self.data_bytes[1].ss, self.data_bytes[1].es, to_anno) + + self.data_bytes = self.data_bytes[2:] + + else: + raise RuntimeError( + f'Unexpected broadcast bit value {self.broadcast_bit}') + + if (self.from_function is not None) and (self.to_function is not None) and \ + (len(self.data_bytes) > 0): + # Dispatch to device and function ID handling + # This logic allows for prioritised matching + fn = filter(lambda x: x is not None, [ + getattr( + self, f'pkt_from_{self.from_function:02x}_to_{self.to_function:02x}', None), + getattr(self, f'pkt_to_{self.to_function:02x}', None), + getattr(self, f'pkt_from_{self.from_function:02x}', None), + getattr(self, f'pkt_{self.to_function:02x}', None), + getattr(self, f'pkt_{self.from_function:02x}', None), + # getattr(self, f'pkt_default', None) + ]) + + for f in fn: + if f() is True: + break + + # + # Prepare for next frame + self.reset() + + else: + # + # Invalid state transition + self.reset() diff --git a/decoders/iebus/__init__.py b/decoders/iebus/__init__.py new file mode 100644 index 00000000..e6e9e220 --- /dev/null +++ b/decoders/iebus/__init__.py @@ -0,0 +1,26 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2023 Maciej Grela +## +## This program is free software: you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program. If not, see . + + +''' +IEBus (Inter Equipment Bus) is a communication bus specification +"between equipments within a vehicle or a chassis". IEBus is mainly +used for car audio and car navigations as well some vending machines. +''' + +from .pd import Decoder diff --git a/decoders/iebus/pd.py b/decoders/iebus/pd.py new file mode 100644 index 00000000..3d50f60b --- /dev/null +++ b/decoders/iebus/pd.py @@ -0,0 +1,494 @@ +# +# This file is part of the libsigrokdecode project. +# +# Copyright (C) 2023 Maciej Grela +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +''' +IEBus data timings help to understand the bits() method. + +This drawning documents Mode 2 +Drawing not to scale +Bit 0 dominates on the bus + + + │ │ + byte n-1 │ byte n │ byte n+1 + ◄───────── │ ◄───────── │ ─────────► + │ │ + │ prep. synchronization data │ prep. + │ period period period │ period + │ │ + │ │ │ │ │ + ──────────┼────────┼─────────────────────┼────────┼────────┼───────────────────────────────► + │ │ │ │ │ + + ▲ ΔV + │ │ + │ ┌─────────────────────┐ │ ┌────────────── │ > 120 mV + │ │ │ │ │ │ + │ │ │ │ │ │ + │ │ │ │ │ │ Bus voltage +Bit 1 │ │ │ │ │ │ (differential) + │ │ │ │ │ │ + │ │ │ │ │ │ + │ │ │ │ │ + ───────────────┘ └────────┼────────┘ │ < 20 mV + │ + │ │ │ + │ │ │ + │ │ │ + │ │ │ + │ │ │ ▲ ΔV + │ │ + │ ┌──────────────────────────────┐ ┌────────────── │ > 120 mV + │ │ │ │ │ + │ │ │ │ │ │ + │ │ │ │ │ │ Bus voltage +Bit 0 │ │ │ │ │ │ (differential) + │ │ │ │ │ │ + │ │ │ │ │ │ + │ │ │ │ │ + ───────────────┘ │ └────────┘ │ < 20 mV + │ + │ │ │ │ │ + ───────────┼────────┼─────────────────────┼────────┼────────┼────────────────────────────► t + │ │ │ │ │ + 7 µs 20 µs 12 µs 7 µs + + +The IEBus decoder uses the following OUTPUT_PYTHON format: + +Frame: +[, ] + +: +- 'HEADER' (Start bit + Broadcast bit, is the Broadcast bit value) +- 'MASTER ADDRESS' ( is (address, parity_bit) ) +- 'SLAVE ADDRESS' ( is (address, parity_bit, ack_bit) ) +- 'CONTROL' ( is (control, parity_bit, ack_bit) ) +- 'DATA LENGTH' ( is (data_length, parity_bit, ack_bit) ) +- 'DATA' ( contains frame data bytes and their ss/es numbers formatted + as follows: [ (data1, parity_bit, ack_bit, ss, es), + (data2, parity_bit, ack_bit, ss, es), ... + ] ) +- 'NAK' ( is None) + +Parity errors and NAK conditions are annotated but otherwise all data +is passed to the output unchanged. +Control bits are either decoded to one of the names from the Commands enum or +left as integer if no match is found. +''' + +from functools import reduce +from enum import IntEnum +import sigrokdecode as srd # pylint: disable=import-error + + +class Commands(IntEnum): + ''' + Valid values for the control bits if IEBus frames. + + Reference: https://en.wikipedia.org/wiki/IEBus + Reference: http://softservice.com.pl/corolla/avc/avclan.php + ''' + + + @classmethod + def has_value(cls, value: int): + '''Searches for a particular integer value in the enum''' + return value in iter(cls) + + + READ_STATUS = 0x00 # Reads slave status + READ_DATA_LOCK = 0x03 # Reads data and locks unit + READ_LOCK_ADDR_LO = 0x04 # Reads lock address (lower 8 bits) + READ_LOCK_ADDR_HI = 0x05 # Reads lock address (higher 4 bits) + READ_STATUS_UNLOCK = 0x06 # Reads slave status and unlocks unit + READ_DATA = 0x07 # Reads data + WRITE_CMD_LOCK = 0x0a # Writes command and locks unit + WRITE_DATA_LOCK = 0x0b # Writes data and locks unit + WRITE_CMD = 0x0e # Writes command + WRITE_DATA = 0x0f # Writes data + + +def first_true(iterable, default=None, pred=None): + """Returns the first true value in the iterable. + + If no true value is found, returns *default* + + If *pred* is not None, returns the first item + for which pred(item) is true. + + Reference: https://docs.python.org/3/library/itertools.html + """ + # first_true([a,b,c], x) --> a or b or c or x + # first_true([a,b], x, f) --> a if f(a) else b if f(b) else x + return next(filter(pred, iterable), default) + + +class Decoder(srd.Decoder): + '''IEBus decoder class for usage by libsigrokdecode.''' + + api_version = 3 + id = 'iebus' + name = 'IEBus' + longname = 'Inter-Equipment Bus' + desc = 'Inter-Equipment Bus is an automotive communication bus used in Toyota and Honda vehicles' # pylint: disable=line-too-long + license = 'gplv3+' + inputs = ['logic'] + outputs = ['iebus'] + tags = ['Automotive'] + channels = ( + {'id': 'bus', 'name': 'BUS', 'desc': 'Bus input'}, + ) + options = ( + {'id': 'mode', 'desc': 'Mode', 'values': ( + 'Mode 2', ), 'default': 'Mode 2'}, + {'id': 'bus_polarity', 'desc': 'Bus polarity', 'default': 'idle-low', + 'values': ('idle-low', 'idle-high')}, + {'id': 'ignore_nak', 'desc': 'Ignore NAK condition', 'default': 'Disabled', + 'values': ('Disabled', 'Enabled')} + ) + annotations = ( + ('start-bit', 'Start bit'), # 0 + ('bit', 'Bit'), # 1 + ('parity', 'Parity'), # 2 + ('ack', 'Acknowledge'), # 3 + + ('broadcast', 'Broadcast flag'), # 4 + ('maddr', 'Master address'), # 5 + ('saddr', 'Slave address'), # 6 + ('control', 'Control'), # 7 + ('datalen', 'Data Length'), # 8 + ('byte', 'Data Byte'), # 9 + + ('warning', 'Warning') + ) + + annotation_rows = ( + ('bits', 'Bits', (0, 1, 2, 3)), + ('fields', 'Raw Fields', (4, 5, 6, 7, 8, 9)), + ('warnings', 'Warnings', (10,)) + ) + + + def __init__(self): + self.out_ann = self.out_python = None + self.samplerate = None + self.broadcast_bit = None + self.bits_begin = self.bits_end = None + + + def reset(self): + '''Reset decoder state.''' + + + def start(self): + '''Start decoder.''' + self.out_ann = self.register(srd.OUTPUT_ANN) + self.out_python = self.register(srd.OUTPUT_PYTHON) + + + def metadata(self, key, value): + '''Handle metadata input from libsigrokdecode.''' + if key == srd.SRD_CONF_SAMPLERATE: + self.samplerate = value + + + def reduce_bus(self, bus: list): + '''Reduce a list of bit values to an integer (MSB bit order).''' + return reduce(lambda a, b: (a << 1) | b, bus) + + + def bits(self, n: int): + ''' + Read n bits from the bus. + Only Mode 2 is currently supported. + + + ''' + self.bits_begin = None + self.bits_end = None + + bits = [] + while n > 0: + if self.options['bus_polarity'] == 'idle-low': + self.wait({0: 'r'}) + elif self.options['bus_polarity'] == 'idle-high': + self.wait({0: 'f'}) + else: + raise Exception(f'Unexpected bus_polarity value "{bus_polarity}"') + + if self.bits_begin is None: + self.bits_begin = self.samplenum + bit_start = self.samplenum + + # In Mode 2 synchronization phase is 20µs, data phase is 13µs, + # sample bit state 27µs after the synchronization edge + # (approx. 20µs + 13µs / 2) + + pins = self.wait({'skip': int(27e-6 * self.samplerate)}) + bit = (pins[0] + 1) % 2 + + # Invert bit value when bus is idle high + if self.options['bus_polarity'] == 'idle-high': + bit = (bit + 1) % 2 + + # Assume full 33µs bit length after sync edge + bit_end = bit_start + int(33e-6 * self.samplerate) + + bits.append(bit) + self.put(bit_start, bit_end, self.out_ann, + [1, [str(bit)]]) + + self.bits_end = bit_end + + n -= 1 + + return bits + + + def bit(self): + '''Read one bit from the bus.''' + return self.bits(1)[0] + + + def value(self, num_bits: int): + '''Read a value from the bus having num_bits bits (MSB first)''' + v = self.reduce_bus(self.bits(num_bits)) + return (v, self.bits_begin, self.bits_end) + + + def find_annotation(self, anno_name: str): + '''Find an annotation index based on its name.''' + return first_true(enumerate(self.annotations), default=(None, (None, None)), + pred=lambda item: item[1][0] == anno_name) + + + def putx(self, anno_name: str, ss, es, v): + '''Put in an annotation using its name.''' + (idx, _) = self.find_annotation(anno_name) + if idx is None: + raise RuntimeError(f'Cannot find annotation name {anno_name}') + self.put(ss, es, self.out_ann, [idx, v]) + + def header(self): + ''' + Read the header from the bus and add appropriate annotations. + Returns the header bits. + ''' + + # Start bit + # + if self.options['bus_polarity'] == 'idle-low': + self.wait({0: 'r'}) + ss = self.samplenum + self.wait({0: 'f'}) + es = self.samplenum + elif self.options['bus_polarity'] == 'idle-high': + self.wait({0: 'f'}) + ss = self.samplenum + self.wait({0: 'r'}) + es = self.samplenum + else: + raise Exception(f'Unexpected bus_polarity value "{bus_polarity}"') + + if (es - ss) / self.samplerate < 100e-6: + self.putx('warning', ss, es, + ['Startbit too short', 'Too short']) + + return (None, None, ss, es) + + self.putx('start-bit', ss, es, ['Start bit', 'Start', 'S']) + + # Broadcast bit + # + broadcast_bit = self.read_broadcast_bit() + es = self.samplenum + + return (1, broadcast_bit, ss, es) + + + def read_broadcast_bit(self): + '''Read the broadcast bit from the bus and add appropriate annotations.''' + broadcast_bit = self.bit() + + if broadcast_bit == 1: + broadcast_anno = ['Unicast', 'Uni', 'U'] + elif broadcast_bit == 0: + # Broadcast traffic has bit 0 here in order to + # dominate on the bus. + broadcast_anno = ['Broadcast', 'Bro', 'B'] + else: + raise RuntimeError(f'Unexpected broadcast bit value {broadcast_bit}') + + self.putx('broadcast', self.bits_begin, self.bits_end, broadcast_anno) + + return broadcast_bit + + + def ack_bit(self): + '''Read the ACK/NAK bit from the bus and add appropriate annotations.''' + ack_bit = self.bit() + if self.broadcast_bit == 1: + # Non-broadcast traffic + + # Force ack bit to be 0 if NAK condition is to be ignored + if self.options['ignore_nak'] == 'Enabled': + ack_bit = 0 + + if ack_bit == 0: + # ACK needs to dominate on the bus + self.putx('ack', self.bits_begin, self.bits_end, ['ACK', 'A']) + elif ack_bit == 1: + self.putx('ack', self.bits_begin, self.bits_end, ['NAK', 'N']) + else: + raise RuntimeError('Unexpected value {ack_bit} for the acknowledge bit') + + return ack_bit + + + def parity_bit(self, value: int): + '''Read the parity bit from the bus and add appropriate annotations.''' + parity_bit = self.bit() + self.putx('parity', self.bits_begin, + self.bits_end, ['Parity', 'Par', 'P']) + expected_parity = bin(value).count('1') % 2 + if expected_parity != parity_bit: + self.putx('warning', self.bits_begin, + self.bits_end, ['Parity error']) + + return parity_bit + + + def handle_data_bytes(self, data_len: int): + ''' + Read a specified amount of data bytes from the bus, add appropriate + annotations, record sample counts for each byte and return a list. + ''' + data_bytes = [] + + while data_len > 0: + (b, ss, es) = self.value(8) + + self.putx('byte', ss, es, + [f'Data: 0x{b:02x}', f'0x{b:02x}']) + + parity_bit = self.parity_bit(b) + ack_bit = self.ack_bit() + + data_bytes.append((b, parity_bit, ack_bit, ss, es)) + + data_len -= 1 + + # We don't care about the value of these bits, just annotations + if self.broadcast_bit == 1 and ack_bit == 1: + # NAK condition, restart search for start bit + break + + return data_bytes + + + def decode(self): + '''Decode samples, main function called by the libsigrokdecode framework''' + while True: + + (start_bit, broadcast_bit, ss, es) = self.header() + + if start_bit is None: + # Header was not valid, search for next one + continue + + # Store broadcast bit for later checks of NAK conditions + self.broadcast_bit = broadcast_bit + + self.put(ss, es, self.out_python, ['HEADER', self.broadcast_bit]) + + # Master adddress + # + (master_addr, ss, es) = self.value(12) + + self.putx('maddr', ss, es, [ f'Master: 0x{master_addr:03x}', f'0x{master_addr:03x}' ]) + + parity_bit = self.parity_bit(master_addr) + self.put(ss, es, self.out_python, [ + 'MASTER ADDRESS', (master_addr, parity_bit) ]) + + # Slave adddress + # + (slave_addr, ss, es) = self.value(12) + + self.putx('saddr', ss, es, [ f'Slave: 0x{slave_addr:03x}', f'0x{slave_addr:03x}']) + + parity_bit = self.parity_bit(slave_addr) + ack_bit = self.ack_bit() + self.put(ss, es, self.out_python, [ + 'SLAVE ADDRESS', (slave_addr, parity_bit, ack_bit) + ]) + if self.broadcast_bit == 1 and ack_bit == 1: + # NAK condition, restart search for start bit + self.put(self.bits_begin, self.bits_end, self.out_python, ['NAK', None]) + continue + + # Control bits + # + (control, ss, es) = self.value(4) + parity_bit = self.parity_bit(control) + ack_bit = self.ack_bit() + + if Commands.has_value(control): + control = Commands(control) + self.putx('control', ss, es, [ f'Control: {control.name}', f'{control.name}' ]) + self.put(ss, es, self.out_python, [ 'CONTROL', (control.name, parity_bit, ack_bit) ]) + else: + self.putx('control', ss, es, [ f'Control: 0x{control:02x}', f'0x{control:02x}' ]) + self.put(ss, es, self.out_python, [ 'CONTROL', (control, parity_bit, ack_bit) ]) + + if self.broadcast_bit == 1 and ack_bit == 1: + # NAK condition, restart search for start bit + self.put(self.bits_begin, self.bits_end, self.out_python, ['NAK', None]) + continue + + # Data length + # + (data_len, ss, es) = self.value(8) + parity_bit = self.parity_bit(data_len) + + if data_len == 0: # 0x00 is 256 bytes + data_len = 256 + + self.putx('datalen', ss, es, [ f'Data Length: {data_len}', f'{data_len}', 'Len' ]) + + if data_len > 128: + self.putx('warning', ss, es, + ['Message too long, mode 2 allows only for 128 bytes maximum', + 'Message too long', 'Too long']) + + ack_bit = self.ack_bit() + self.put(ss, es, self.out_python, [ 'DATA LENGTH', (data_len, parity_bit, ack_bit) ]) + + if self.broadcast_bit == 1 and ack_bit == 1: + # NAK condition, restart search for start bit + self.put(self.bits_begin, self.bits_end, self.out_python, ['NAK', None]) + continue + + # Data bytes + # + ss = self.samplenum + data_bytes = self.handle_data_bytes(data_len) + es = self.samplenum + + self.put(ss, es, self.out_python, [ 'DATA', data_bytes ])