Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a PPU component #71

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions src/extra/components/ppu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import logging
from typing import List, Union

import numpy as np
import pandas as pd
from extra_data import by_id
from extra_data.keydata import KeyData
from extra_data.reader import DataCollection
from extra_data.sourcedata import SourceData

log = logging.getLogger(__name__)


def _find_ppu(run: DataCollection, device: str = None):
"""Helper function to find a PPU device."""

# fast path, we don't validate if the type or name match
if isinstance(device, SourceData):
return device
elif isinstance(device, KeyData):
return run[device.source]
elif isinstance(device, str):
if device in run.control_sources:
return run[device]
elif device in run.alias:
return _find_ppu(run, run.alias[device])
# else search substring for match
elif device is not None:
raise KeyError(f"ppu must be a SourceData or str, not {type(device).__name__}")

# Then we list all PPU device in the run
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the 'then' is a bit misleading, it sounds like this is logically done after the code above, but this is the default entry point if no device is given by argument (but if it is given, the function will have returned at this point).
It is veeeery minor, but maybe "# Default: we get all PPU devices in the run as checked against hardcoded device classes, respectively substrings"

available_ppus = [
source
for source in run.control_sources
if run[source].device_class in PPU._DEVICE_CLASSES
]
if len(available_ppus) == 0:
available_ppus = [s for s in run.control_sources if "MDL/PPU" in s]

if len(available_ppus) == 0:
raise KeyError("Could not find a PPU device in this data")
elif len(available_ppus) == 1:
return run[available_ppus[0]]
elif len(available_ppus) > 1:
if device:
# And unique substrings of available PPU
matches = [name for name in available_ppus if device.upper() in name]
if len(matches) == 1:
return run[matches[0]]
elif len(matches) == 0:
KeyError(
f"Couldn't identify an PPU from '{device}'; please pass a valid device name, alias, or unique substring"
)
else:
KeyError(
f"Multiple XGMs found matching '{device}', please be more specific: {matches}"
)
raise KeyError(f"Multiple PPU devices found in that data: {available_ppus}")


class PPU:
"""Interface to a PPU (Pulse Picker Unit).

Despite its name, the PPU picks trains, not pulses.
"""

_DEVICE_CLASSES = ["PulsePickerTrainTrigger", "PulsePickerTrainTriggerCopy"]

def __init__(
self, data: DataCollection, ppu: Union[KeyData, SourceData, str] = None
):
"""

Args:
data (DataCollection):
ppu (Union[KeyData, SourceData, str], optional):
Specify a Pulse Picker Unit device to use, necessary if a run
contains more than one PPU. This can be any of:
- The device name of the source.
- A `SourceData` or [KeyData][extra_data.KeyData] of the
control source (e.g. `HED_XTD6_PPU/MDL/PPU_TRIGGER`) of an
XGM.
- The alias name of either a `SourceData` or
[KeyData][extra_data.KeyData] belonging to a PPU.
- A unique (case-insensitive) substring of a PPU source name.

Raises:
KeyError: If we can't identify a unique PPU device from the
arguments.
"""
self.data = data
self.device = _find_ppu(data, ppu)

def train_ids(
self, offset: int = 0, labelled: bool = False
) -> Union[List[int], pd.Series]:
"""All train IDs picked by the PPU.

Args:
offset (int, optional):
offset to add to the selected trains. Defaults to 0.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For interest: why would the user want to add an offset? I'm sure there are use cases, just asking

labelled (bool, optional):
Returns a Pandas Series if set to True, where this index represents the
trigger sequence a train ID is part of. Defaults to False.

Returns:
Union[List[int], pd.Series]: Train IDs picked by the PPU.
"""
seq_start = self.device["trainTrigger.sequenceStart"].ndarray()
# The trains picked are the unique values of trainTrigger.sequenceStart
# minus the first (previous trigger before this run).
start_train_ids = np.unique(seq_start)[1:] + offset

train_ids = []
sequences = []
for seq, train_id in enumerate(start_train_ids):
n_trains = self.device["trainTrigger.numberOfTrains"]
n_trains = n_trains.select_trains(by_id[[train_id]]).ndarray()[0]
train_ids.extend(list(range(train_id, train_id + n_trains)))
sequences.extend([seq] * n_trains)
# drop train ids missing from the run
train_ids = sorted(set(train_ids).intersection(self.device.train_ids))

log.info(
f"PPU device {self.device.source} triggered for {len(train_ids)} train(s) across {len(sequences)} sequence(s)."
)

if labelled:
train_ids = pd.Series(train_ids, index=sequences)
return train_ids

def trains(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you consider a different name for this method? Both in EXtra-data as well as across existing components, we use .trains() to return an iterator over trains. This method however takes something that has train IDs and changes it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Do you have something in mind?

  • filter? filter_trains?
  • train_selection? select? select_trains?
  • pick? pick_trains?
  • take? take_trains?

I'd probably go for pick_trains.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select and select_trains also have different meanings already (though in the long run, I would love to pass this component to DataCollection.select_trains, i.e. have some kind of TrainSelector protocol)

From your list, I would probably also go for pick_trains, but it's unfortunate one way or the other that it's the "picking-thing" calling on the "thing-to-be-picked" 🤔
Maybe @takluyver can comment on what sounds most natural.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to pass this component to DataCollection.select_trains, i.e. have some kind of TrainSelector protocol

yes, me too!
But do we have an idea of what/how we want to select trains or how this protocol can look like? I can easily think on how to do that specifically for the PPU device, but I can't imagine other cases for now (maybe you have more use cases as part of the data reduction).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we invert the problem: Instead of having a method that selects trains on some object, have a property or method that returns the correspondig extra_data.by_id slice? Then selection becomes: run.select_trains(ppu.train_sel) (name obviously preliminary)

Whenever we decide on a universal TrainSelector protocol, we can simply add it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to something like ppu.train_sel. For the split-sequence option, we could make PPU iterable, giving some sort of TrainSequence or TrainGroup object so you could do [run.select_trains(seq.train_sel) for seq in ppu].

self, split_sequence: bool = False, offset: int = 0
) -> Union[DataCollection, List[DataCollection]]:
"""Returns a subset of the data only with Trains selected by the PPU.

Args:
split_sequence (bool, optional): Split data per PPU trigger sequence. Defaults to False.
offset (int, optional): offset to apply to train IDs to be selected. Defaults to 0.

Returns:
Union[DataCollection, List[DataCollection]]:
DataCollection(s) containing only trains triggered by the PPU
"""
train_ids = self.train_ids(labelled=True, offset=offset)
if split_sequence:
return [
self.data.select_trains(by_id[seq.values])
for _, seq in train_ids.groupby(train_ids.index)
]
return self.data.select_trains(by_id[train_ids.values])