Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added scripting helper methods #609

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ please use _ni_x_series_in_streamer.py_ as hardware module.
* Set proper minimum wavelength value in constraints of Tektronix AWG7k series HW module
* Added a hardware file for fibered optical switch Thorlabs OSW12/22 via SwitchInterface
* Fixed bug affecting interface overloading of Qudi modules
* Added scripting helper methods at the end of multiple logic modules
*


Expand Down
45 changes: 30 additions & 15 deletions logic/odmr_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,17 @@ def set_runtime(self, runtime):
self.sigParameterUpdated.emit(update_dict)
return self.run_time

def set_cw_parameters(self, frequency, power):
def set_cw_parameters(self, frequency=None, power=None):
""" Set the desired new cw mode parameters.

@param float frequency: frequency to set in Hz
@param float power: power to set in dBm

@return (float, float): actually set frequency in Hz, actually set power in dBm
"""
frequency = frequency if frequency is not None else self.cw_mw_frequency
power = power if power is not None else self.cw_mw_power

if self.module_state() != 'locked' and isinstance(frequency, (int, float)) and isinstance(power, (int, float)):
constraints = self.get_hw_constraints()
frequency_to_set = constraints.frequency_in_range(frequency)
Expand All @@ -387,7 +390,7 @@ def set_cw_parameters(self, frequency, power):
self.sigParameterUpdated.emit(param_dict)
return self.cw_mw_frequency, self.cw_mw_power

def set_sweep_parameters(self, start, stop, step, power):
def set_sweep_parameters(self, start=None, stop=None, step=None, power=None):
""" Set the desired frequency parameters for list and sweep mode

@param float start: start frequency to set in Hz
Expand All @@ -398,6 +401,11 @@ def set_sweep_parameters(self, start, stop, step, power):
@return float, float, float, float: current start_freq, current stop_freq,
current freq_step, current power
"""
start = start if start is not None else self.mw_start
stop = stop if stop is not None else self.mw_stop
step = step if step is not None else self.mw_step
power = power if power is not None else self.sweep_mw_power

limits = self.get_hw_constraints()
if self.module_state() != 'locked':
if isinstance(start, (int, float)):
Expand Down Expand Up @@ -994,33 +1002,30 @@ def draw_figure(self, channel_number, cbar_range=None, percentile_range=None):

return fig

def perform_odmr_measurement(self, freq_start, freq_step, freq_stop, power, channel, runtime,
fit_function='No Fit', save_after_meas=True, name_tag=''):
""" An independant method, which can be called by a task with the proper input values
to perform an odmr measurement.
# Scripting zone
#
# The methods bellow are meant to be used be external script which have their own thread.

def perform_odmr_measurement(self, freq_start=None, freq_step=None, freq_stop=None, power=None, channel=0,
runtime=None, fit_function='No Fit', save_after_meas=True, name_tag=''):
""" An independent method, which can be called by a task or script to perform an odmr measurement.

@return
"""
timeout = 30
start_time = time.time()
while self.module_state() != 'idle':
time.sleep(0.5)
timeout -= (time.time() - start_time)
if timeout <= 0:
if time.time() - start_time > 30:
self.log.error('perform_odmr_measurement failed. Logic module was still locked '
'and 30 sec timeout has been reached.')
return tuple()

# set all relevant parameter:
self.set_sweep_parameters(freq_start, freq_stop, freq_step, power)
self.set_runtime(runtime)
if runtime is not None:
self.set_runtime(runtime)

# start the scan
self.start_odmr_scan()

# wait until the scan has started
while self.module_state() != 'locked':
time.sleep(1)
# wait until the scan has finished
while self.module_state() == 'locked':
time.sleep(1)
Expand All @@ -1037,3 +1042,13 @@ def perform_odmr_measurement(self, freq_start, freq_step, freq_stop, power, chan
self.save_odmr_data(tag=name_tag)

return self.odmr_plot_x, self.odmr_plot_y, fit_params

def stop_odmr_scan_sync(self):
""" Stop the acquistion normally but wait for it to be over before returning

This method blocks the caller's thread and not this module's thread.
"""
self.stop_odmr_scan()
while self.module_state() == 'locked':
time.sleep(0.1)
return
48 changes: 47 additions & 1 deletion logic/poi_manager_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import os
import numpy as np
import time
import re

from collections import OrderedDict
from core.connector import Connector
Expand Down Expand Up @@ -1255,4 +1256,49 @@ def auto_catch_poi(self):
pois[i] = [x_axis[xc2[i]], y_axis[yc2[i]], z]
self.add_poi(pois[i])
if self.poi_nametag is None:
time.sleep(0.1)
time.sleep(0.1)

# Scripting zone
#
# The methods bellow are meant to be used be external script which have their own thread.

def optimise_poi_position_sync(self, name=None, update_roi_position=True):
""" Start an usual optimization procedure but wait for it to finish before returning

This method blocks the caller's thread and not this module's thread.
"""
self.optimise_poi_position(name=name, update_roi_position=update_roi_position)
while self.optimiserlogic().module_state() != 'idle':
time.sleep(0.1)
return

def get_poi_list(self, regexp='', check_in_range=False):
""" Get a list of all the POIs filtered by a given regular expression

@param (str) regexp: A regular expression to match POI names before returning
@param (bool) check_in_range: Only returns POI in range if True

To learn what is a regular expression, see : https://en.wikipedia.org/wiki/Regular_expression
"""
r = re.compile(regexp)
result = [name for name in self.poi_names if r.search(name)]
if check_in_range:
result = [name for name in result if self.is_poi_in_range(name)]
return result

def is_poi_in_range(self, name, margin=0e-6, z_margin=0e-6):
""" Test if a given POI is in range of the scanner

@param (str) name: name of the poi to query
@param (float) margin: Margin (m) on the X and Y directions necessary to consider the POI in range
@param (float) z_margin: Margin (m) on the Z direction necessary to consider the POI in range

This method can be used when iterating over a lot of POI to check if they are currently in the scan range.
Margin can be applied to filter out POIs too close to the scanner edges.
"""
if name is None or name not in self.poi_names:
return False
x, y, z = self.get_poi_position(name)
return self.scannerlogic().x_range[0] + margin <= x <= self.scannerlogic().x_range[1] - margin and \
self.scannerlogic().y_range[0] + margin <= y <= self.scannerlogic().y_range[1] - margin and \
self.scannerlogic().z_range[0] + z_margin <= z <= self.scannerlogic().y_range[1] - z_margin
29 changes: 29 additions & 0 deletions logic/time_series_reader_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,3 +812,32 @@ def _stop_reader_wait(self):
self.module_state.unlock()
self.sigStatusChanged.emit(False, False)
return 0

# Scripting zone
#
# The methods bellow are meant to be used be external script which have their own thread.

def get_average_value(self, duration):
""" Get the average value on a given duration over all channels

@param (float) duration: The approximate duration on which average should be computed.

@return (np.array): An array of the average value of each active channels

"""
if duration > self.trace_window_size:
self.log.error('Trace window duration ({} s) is smaller that required duration ({} s)'.format(
self.trace_window_size, duration))
bins = int(duration * self.data_rate)
return self._trace_data[:, -bins:].mean(axis=1)

def get_average_value_recorded_data(self):
""" Get the average value of each channel on the currently recorded data

@return (np.array): An array of the average value of each active channels on the currently recorded data

"""
if len(self._recorded_data) == 0:
self.log.error('There is no recorded data. Can not get average value.')
return np.concatenate(self._recorded_data, axis=1).mean(axis=1)