diff --git a/saleae/saleae.py b/saleae/saleae.py index ce1dbb3..2323de4 100644 --- a/saleae/saleae.py +++ b/saleae/saleae.py @@ -92,7 +92,10 @@ def __init__(self, host='localhost', port=10429): def _build(self, s): '''Convenience method for building up a command to send''' - self._to_send.append(s) + if type(s) is list: + self._to_send.extend(s) + else: + self._to_send.append(s) def _abort(self): self._to_send = [] @@ -326,7 +329,7 @@ def get_bandwidth(self, sample_rate, device = None, channels = None): adc_width = 12 return sample_rate[0] * len(digital_channels) +\ - sample_rate[1] * len(analog_channels) * adc_width + sample_rate[1] * len(analog_channels) * adc_width def get_performance(self): '''Get performance value. Performance controls USB traffic and quality. @@ -467,16 +470,18 @@ def set_active_channels(self, digital=None, analog=None): >>> s.set_active_channels([0,1,2,3], [0]) #doctest:+SKIP ''' # TODO Enfore note from docstring - if digital is None and analog is None: - raise self.ImpossibleSettings("At least one digital or analog channel must be set") + digital_no = 0 if digital is None else len(digital) + analog_no = 0 if analog is None else len(analog) + if digital_no <= 0 and analog_no <= 0: + raise self.ImpossibleSettings('Logic requires at least one activate channel (digital or analog) and none are given') self._build('SET_ACTIVE_CHANNELS') - self._build('digital_channels') - for ch in digital: - self._build('{}'.format(ch)) - self._build('analog_channels') - for ch in analog: - self._build('{}'.format(ch)) + if digital_no > 0: + self._build('digital_channels') + self._build(['{0:d}'.format(ch) for ch in digital]) + if analog_no > 0: + self._build('analog_channels') + self._build(['{0:d}'.format(ch) for ch in analog]) self._finish() def reset_active_channels(self): @@ -544,112 +549,139 @@ def load_from_file(self, file_path_on_target_machine): def close_all_tabs(self): self._cmd('CLOSE_ALL_TABS') - def export_data(self, - file_path_on_target_machine, - digital_channels=None, - analog_channels=None, - analog_format="voltage", - time_span=None, # 'None-->all_time, [x.x, y.y]-->time_span' - format="csv", # 'csv, bin, vcd, matlab' - csv_column_headers=True, - csv_delimeter='comma', # 'comma' or 'tab' - csv_timestamp='time_stamp', # 'time_stamp, sample_number' - csv_combined=True, # 'combined' else 'separate' - csv_row_per_change=True, # 'row_per_change' else 'row_per_sample' - csv_number_format='hex', # dec, hex, bin, ascii - bin_per_change=True, # 'on_change' else 'each_sample' - bin_word_size='8' # 8, 16, 32, 64 - ): - # export_data, C:\temp_file, digital_channels, 0, 1, analog_channels, 1, voltage, all_time, adc, csv, headers, comma, time_stamp, separate, row_per_change, Dec - # export_data, C:\temp_file, all_channels, time_span, 0.2, 0.4, vcd - # export_data, C:\temp_file, analog_channels, 0, 1, 2, adc, all_time, matlab + def _export_data_analog_binary(self, analog_format='voltage'): + '''Binary analog: [VOLTAGE|ADC]''' + + # Do argument verification + if analog_format.lower() not in ['voltage', 'adc']: + raise self.ImpossibleSettings('Unsupported binary analog format') + + # Build arguments + self._build(analog_format.upper()) + + # NOTE: the [EACH_SAMPLE|ON_CHANGE] is the same as the CSV [ROW_PER_CHANGE|ROW_PER_SAMPLE], but I am using name convention from official C# API + def _export_data_digital_binary(self, each_sample=True, no_shift=True, word_size=16): + '''Binary digital: [EACH_SAMPLE|ON_CHANGE], [NO_SHIFT|RIGHT_SHIFT], [8|16|32|64]''' + # Do argument verification + if word_size not in [8, 16, 32, 64]: + raise self.ImpossibleSettings('Unsupported binary word size') + + # Build arguments + self._build('EACH_SAMPLE' if each_sample else 'ON_CHANGE') + self._build('NO_SHIFT' if no_shift else 'RIGHT_SHIFT') + self._build(str(word_size)) + + def _export_data_analog_csv(self, column_headers=True, delimiter='comma', display_base='hex', analog_format='voltage'): + '''CVS export analog/mixed: [HEADERS|NO_HEADERS], [COMMA|TAB], [BIN|DEC|HEX|ASCII], [VOLTAGE|ADC]''' + + # Do argument verification + if delimiter.lower() not in ['comma', 'tab']: + raise self.ImpossibleSettings('Unsupported CSV delimiter') + if display_base.lower() not in ['bin', 'dec', 'hex', 'ascii']: + raise self.ImpossibleSettings('Unsupported CSV display base') + if analog_format.lower() not in ['voltage', 'adc']: + raise self.ImpossibleSettings('Unsupported CSV analog format') + + # Build arguments + self._build('HEADERS' if column_headers else 'NO_HEADERS') + self._build(delimiter.upper()) + self._build(display_base.upper()) + self._build(analog_format.upper()) + + def _export_data_digital_csv(self, column_headers=True, delimiter='comma', timestamp='time_stamp', display_base='hex', rows_per_change=True): + '''CVS export digital: [HEADERS|NO_HEADERS], [COMMA|TAB], [TIME_STAMP|SAMPLE_NUMBER], [COMBINED, [BIN|DEC|HEX|ASCII]|SEPARATE], [ROW_PER_CHANGE|ROW_PER_SAMPLE]''' + + # Do argument verification + if delimiter.lower() not in ['comma', 'tab']: + raise self.ImpossibleSettings('Unsupported CSV delimiter') + if timestamp.lower() not in ['time_stamp', 'sample_number']: + raise self.ImpossibleSettings('Unsupported timestamp setting') + if display_base.lower() not in ['bin', 'dec', 'hex', 'ascii', 'separate']: + raise self.ImpossibleSettings('Unsupported CSV display base') + + # Build arguments + self._build('HEADERS' if column_headers else 'NO_HEADERS') + self._build(delimiter.upper()) + self._build(timestamp.upper()) + self._build('SEPERATE' if display_base.lower() == 'SEPERATE' else ['COMBINED', display_base.upper()]) + self._build('ROW_PER_CHANGE' if rows_per_change else 'ROW_PER_SAMPLE') + + def _export_data_digital_vcd(self): + '''VCD digital: no arguments''' + pass + + def _export_data_analog_matlab(self, analog_format='voltage'): + '''Matlab analog: [VOLTAGE|ADC]''' + + # Do argument verification + if analog_format.lower() not in ['voltage', 'adc']: + raise self.ImpossibleSettings('Unsupported Matlab analog format') + + # Build arguments + self._build(analog_format.upper()) + + def _export_data_digital_matlab(self): + '''Matlab digital: no arguments''' + pass + + def export_data(self, file_path_on_target_machine, digital_channels=None, analog_channels=None, time_span=None, format='csv', **export_args): + '''Export command: + EXPORT_DATA2, + , + [ALL_CHANNELS|SPECIFIC_CHANNELS, [DIGITAL_ONLY|ANALOG_ONLY|ANALOG_AND_DIGITAL], [ANALOG|DIGITAL], ..., [ANALOG|DIGITAL]], + [ALL_TIME|TIME_SPAN, <(double)start>, <(double)end>], + [BINARY, |CSV, |VCD|MATLAB, ] + ''' while not self.is_processing_complete(): time.sleep(1) - # The path needs to be absolute. This is hard to check reliably since we - # don't know the OS on the target machine, but we can do a basic check - # for something that will definitely fail + # NOTE: Note to Saleae, Logic should resolve relative paths, I do not see reasons not to do this ... if file_path_on_target_machine[0] in ('~', '.'): - raise NotImplementedError('File path must be absolute') + raise ValueError('File path must be absolute') - self._build('EXPORT_DATA') + self._build('EXPORT_DATA2') self._build(file_path_on_target_machine) + + # Channel selection + is_analog = False if (digital_channels is None) and (analog_channels is None): - self._build('all_channels') - analog_channels = self.get_active_channels()[1] + self._build('ALL_CHANNELS') + is_analog = len(self.get_active_channels()[1]) > 0 else: + self._build('SPECIFIC_CHANNELS') + # Check for mixed mode + # NOTE: This feel redundant, we can see if we digital only, analog only or mixed from parsing the channels right?! + # especially given the fact that only ANALOG_AND_DIGITAL is printed and never DIGITAL_ONLY or ANALOG_ONLY (according to Saleae C# implementation) + if digital_channels is not None and len(digital_channels) and analog_channels is not None and len(analog_channels): + self._build('ANALOG_AND_DIGITAL') + + # Add in the channels if digital_channels is not None and len(digital_channels): - self._build('digital_channels') - for ch in digital_channels: - self._build(str(ch)) + self._build(['{0:d} DIGITAL'.format(ch) for ch in digital_channels]) if analog_channels is not None and len(analog_channels): - self._build('analog_channels') - for ch in analog_channels: - self._build(str(ch)) - if analog_channels is not None and len(analog_channels): - if analog_format not in ('voltage', 'adc'): - raise NotImplementedError("bad analog_format") - self._build(analog_format) + self._build(['{0:d} ANALOG'.format(ch) for ch in analog_channels]) + is_analog = True + # Time selection if time_span is None: - self._build('all_time') + self._build('ALL_TIME') elif len(time_span) == 2: - self._build('time_span') - self._build(str(time_span[0])) - self._build(str(time_span[1])) + self._build(['TIME_SPAN', '{0:f}'.format(time_span[0]), '{0:f}'.format(time_span[0])]) else: - raise NotImplementedError('invalid time format') - - if format == 'csv': - self._build(format) - - if csv_column_headers: - self._build('headers') - else: - self._build('no_headers') - - if csv_delimeter not in ('comma', 'tab'): - raise NotImplementedError('bad csv delimeter') - self._build(csv_delimeter) - - if csv_timestamp not in ('time_stamp', 'sample_number'): - raise NotImplementedError('bad csv timestamp') - self._build(csv_timestamp) - - if csv_combined: - self._build('combined') - else: - self._build('separate') - - if csv_row_per_change: - self._build('row_per_change') - else: - self._build('row_per_sample') - - if csv_number_format not in ('dec', 'hex', 'bin', 'ascii'): - raise NotImplementedError('bad csv number format') - self._build(csv_number_format) - elif format == 'bin': - self._build(format) - - if bin_per_change: - self._build('on_change') - else: - self._build('each_sample') + raise self.ImpossibleSettings('Unsupported time span') - if bin_word_size not in ('8', '16', '32', '64'): - raise NotImplementedError('bad bin word size') - self._build(bin_word_size) + # Find exporter + export_name = '_export_data_{0:s}_{1:s}'.format('analog' if is_analog else 'digital', format.lower()) + if not hasattr(self, export_name): + raise NotImplementedError('Unsupported export format given ({0:s})'.format(export_name)) - elif format in ('vcd', 'matlab'): - # No options for these - self._build(format) - else: - raise NotImplementedError('unknown format') + # Let specific export function handle arguments + self._build(format.upper()) + getattr(self, export_name)(**export_args) self._finish() - + time.sleep(0.050) # HACK: Delete me when Logic (saleae) race conditions are fixed def get_analyzers(self): '''Return a list of analyzers currently in use, with indexes.'''