Howto create experiment (in four easy steps!)

Create device drivers for all needed devices

See device_api.rst

Create device managers for all the devices

Single mode devices

Device managers are wrappers around devices that have following repsonsibilities:

  1. Provide metadata about the device it holds
  2. Validate input
  3. Provide any neccessary per-experiment customisation

For example you need to create a DeviceManager for HantekPPS2116ADevice that can be found in this repository: https://bitbucket.org/silf/silf-backend-driver-power-hantek. This is a simple programmable power source. This manager will also allow user to directly set temperature of light-bulb that is beinbg controlled by this power source, so we will need to convert both user settinga as well as experiment responses.

We’ll start with the following:

class HantekManager(SingleModeDeviceManager):

    DEVICE_ID = "hantek"
    DEVICE_CONSTRUCTOR = HantekPPS2116ADevice
    CONTROLS = ControlSuite(
        NumberControl("temperature", "Temperatura włókna żarówki", default_value=2700, min_value=300, max_value=2800)
    )

    OUTPUT_FIELDS = OutputFieldSuite(
        blackbody_temperature = OutputField(
            "integer-indicator", "temperature", label="Aktualna temperatura włókna żarówki"
        )
    )

    RESULT_CREATORS = [
        ReturnLastElementResultCreator("temperature")
    ]

Following points are important:

  1. DEVICE_ID is programmer-readable unique for experiment name of the device, it is not visible to the end-user.
  2. DEVICE_CONSTRUCTOR is a callable that creates the Device. Mostly it will be just a subtype of Device. It should accept the same arguments as meth: Device.__init__()
  3. CONTROLS Controls define controls for user for this device. For more information about how controls work and how they are represented read: Input fields and :mod:`silf.backend.commons.
  4. Result creators allows to customize how results are sent to user (there will be more about it in this document)

Converting settings

For now the problem is that device needs voltage and current, and student is providing the temperature. To convert settings you need to override silf.backend.commons.device_manager.DefaultDeviceManager._convert_settings_to_device_format().

class HantekManager(SingleModeDeviceManager):

    ...

    def __get_hantek_settings_from_temperature(self, temperature):
        return {'voltage' : ( 1.55155e-6 *temperature**2 - 0.00045359 ) }  #Temperature [K] approx. conv. to volt.

    def _convert_settings_to_device_format(self, converted_settings):
        return self.__get_hantek_settings_from_temperature(converted_settings['temperature'])

Converting results

Also resuls need to be converted:

class HantekManager(SingleModeDeviceManager):

    ...

    def __get_temperature_from_hantek_results(self, voltage, current):
        return {"temperature" : (math.sqrt( 684732 * voltage) + 86.5226)} #Voltage approx. conv. to temp. K

    def _convert_result(self, results):
        return self.__get_temperature_from_hantek_results(
            results['voltage'], results['current']
        )

Results creators

Devices produce results asynchroneusly, that is while DeviceManager was doing something else Device could procudce many results (each of these results being a dictionary holding many values) or none at all. RESULT_CREATORS are designed to convert that unknown number of results to something that can be sent to users.

If experiment produced N result dictionaries, first we convert each of them using _convert_results then we fire result creators.

Create the experiment manager

Complicated experiment manager

class BlackbodyExperiment(EventExperimentManager):

    LOOP_TIMEOUT = 1

    DEVICE_MANAGERS = {
        'hantek': HantekManager,
        'strawberry': StrawberryManager,
        'voltimeter': RigolDM3000UsbtmcManager
    }

Configuration:

LOOP_TIMEOUT
As everywhere — will controls wait after each iteration of main loop
DEVICE_MANAGERS
Dictionary mapping name to IDeviceManager.

Logic of this experiment is as follows:

  • For each point:
  • Move detector to the desired position
  • Set parameters for the light source
  • Capture voltage

We need to implement this logic using EventExperimentManager. In this class each lifecycle method launches an event, rest of the logic may be added via custom events.

All lifecycle events will be handled by default handlers, with the exception of ‘start’ which will invoke new logic.

These default handlers have following meaning

power_up
Powers up all the devices
stop
Stops all the devices
power_down
Powers down all the devices
apply_settings
Applies settings to all the devices
tick
Calls Device.loop_iteration()
pop_results
If device is running gets the results from all the devices and sends them to client.

Here are custom handlers for this device:

start
Clears internal state, starts engine driver and powersource, then and rises next_point event
next_point

Moves engine to the next point and schedules check_position to be fired after one second. If this is the last point fires stop event.

def _on_next_point(self, event):
    try:
        self['strawberry'].move_to_next_series_point() # Moves the engine to the next series point
        self.schedule_event('check_position') # Schedule event for self.LOOP_TIMEOUT
    except OnFinalSeriesPoint:  # This is raised if we are at last series point
        self.experiment_callback.send_series_done() # Send users that series is finished
        self.stop() # Stop all devices
check_position

If engine arrived on target position starts voltimeter and fires check_voltage if not schedules check_position to be fired after one second.

def _on_check_position(self, event):
    # This is how we detect that device finished working -- it should
    # switch it's state
    if self['strawberry'].device_state == DEVICE_STATES[READY]:
        # Purge all cached results
        self['voltimeter'].clear_current_results()
        # Start the voltimeter
        self['voltimeter'].start()
        # Schedule next event for immediate execution
        self.schedule_event('check_voltage', 0)
    else:
        # If engines are working schedule check position once more
        self.schedule_event('check_position')
check_voltage

If voltimeter finished the measurement fires next_point event if not check_voltage to be fired after one second.

Also if voltimeter has finished it sends next point to the user.

def _on_check_voltage(self, event):
    voltimeter = self['voltimeter']

    # If volrimter has results let's send them!
    if voltimeter.has_result('voltage'):
        voltage = voltimeter.current_result_map['voltage']
        wavelength = self['strawberry'].current_wavelength
        results = {'voltage': voltage, 'wavelength': wavelength}
        self.chart_generator.aggregate_results(results)
        chart_item = self.chart_generator.pop_results()
        self.experiment_callback.send_results(ResultSuite(
            **{self.chart_generator.result_name: chart_item}
        ))
        self.schedule_event('next_point', 0)
    else:
        # Wait for results
        self.schedule_event('check_voltage')

Then you need to hook events to this instance:

def initialize(self, experiment_callback):
    super().initialize(experiment_callback)

    # Load default events

    self.install_default_event_managers_for('power_up')
    self.install_default_event_managers_for('stop')
    self.install_default_event_managers_for('power_down')
    self.install_default_event_managers_for('apply_settings')
    self.install_default_event_managers_for('tick')
    self.install_default_event_managers_for('pop_results')

    #Load additional events

    self.register_listener('start', self._on_series_started)
    self.register_listener('next_point', self._on_next_point)
    self.register_listener('check_position', self._on_check_position)
    self.register_listener('check_voltage', self._on_check_voltage)

    # Clear results for chart
    self.register_listener('stop', lambda evt: self.chart_generator.clear())
class BlackbodyExperiment(EventExperimentManager):

    def initialize(self, experiment_callback):
        super().initialize(experiment_callback)

        self.install_default_event_managers_for('power_up')
        self.install_default_event_managers_for('stop')
        self.install_default_event_managers_for('power_down')
        self.install_default_event_managers_for('apply_settings')
        self.install_default_event_managers_for('tick')
        self.install_default_event_managers_for('pop_results')

        self.register_listener('start', self._on_series_started)
        self.register_listener('next_point', self._on_next_point)
        self.register_listener('check_position', self._on_check_position)
        self.register_listener('check_voltage', self._on_check_voltage)
        self.register_listener('stop', lambda evt: self.chart_generator.clear())

    def _on_series_started(self, event):
        self['hantek'].start() # Starts the power source
        self['strawberry'].start() # Starts the engine
        self._on_next_point(event)

    def _on_next_point(self, event):
        try:
            self['strawberry'].move_to_next_series_point() # Moves the engine to the next series point
            self.schedule_event('check_position')
        except OnFinalSeriesPoint:
            self.experiment_callback.send_series_done()
            self.stop()

    def _on_check_position(self, event):
        if self['strawberry'].device_state == DEVICE_STATES[READY]:
            self['voltimeter'].clear_current_results()
            self['voltimeter'].start()
            self.schedule_event('check_voltage', 0)
        else:
            self.schedule_event('check_position')

    def _on_check_voltage(self, event):
        voltimeter = self['voltimeter']

        if voltimeter.has_result('voltage'):
            voltage = voltimeter.current_result_map['voltage']
            wavelength = self['strawberry'].current_wavelength
            results = {'voltage': voltage, 'wavelength': wavelength}
            self.chart_generator.aggregate_results(results)
            chart_item = self.chart_generator.pop_results()
            self.experiment_callback.send_results(ResultSuite(
                **{self.chart_generator.result_name: chart_item}
            ))
            self.schedule_event('next_point', 0)
        else:
            self.schedule_event('check_voltage')

Configure the experiment

With experiment manager you need to create configuration file, please see Experiment configuration file.

[Experiment]
#Experiment name
ExperimentName=MockExperiment
#Name of the experiment manager (one created in last step)
ExperimentManagerClass=silf.backend.commons_test.experiment.mock_experiment_manager.MockExperimentManager #Ścieżka do klasy zarządzającej eksperymentem
#Client class
ClientClass=silf.backend.client.mock_client.Client #Klasa klienta

# How long this experiment will wait before it shuts itself down after last series did end
shutdown_experiment_timeout=3600
# How long this experiment will wait before it kills current session
stop_series_timeout=600


[XMPPClient]

#Configutation for XMPP client
nick = experiment
jid = geiger@pip.ilf.edu.pl/experiment
password = NepjotmirkOdofruebcajigIaheHuSka
room = test-geiger@muc.pip.ilf.edu.pl
port = 5222
host = pip.ilf.edu.pl

[Logging]
config_type=file