Pipelines and Tasks

Experiment configuration file generation and post-experiment reporting is done via a series of tasks that are built up into a pipeline using dask.

Existing Pipelines

ramutils.pipelines.ramulator_config.make_ramulator_config(subject, experiment, paths, stim_params, sessions=None, exp_params=None, vispath=None, extended_blanking=True, localization=0, montage=0, default_surface_area=0.001, trigger_pairs=None, use_common_reference=False, use_classifier_excluded_leads=False, pipeline_name='ramulator-conf')[source]

Generate configuration files for a Ramulator experiment

Parameters:
  • subject (str) – Subject ID
  • experiment (str) – Experiment to generate configuration file for
  • paths (FilePaths) –
  • stim_params (List[StimParameters]) – Stimulation parameters for this experiment.
  • sessions (List[int]) – Sessions to include when training classifier
  • exp_params (ExperimentParameters) – Parameters for the experiment.
  • vispath (str) – Path to save task graph visualization to if given.
  • extended_blanking (bool) – Whether to enable extended blanking on the ENS (default: True).
  • localization (int) – Localization number
  • montage (int) – Montage number
  • default_surface_area (float) – Default surface area to set all electrodes to in mm^2. Only used if no area file can be found.
  • trigger_pairs (List[str] or None) – Pairs to use for triggering stim in PS5 experiments.
  • use_common_reference (bool) – Use a common reference in the electrode configuration instead of bipolar referencing.
  • use_classifier_excluded_leads (bool) – Use contents of classifier_excluded_leads.txt to exclude channels from classifier training
  • pipeline_name (str) – Name to use for status updates.
Returns:

Return type:

The path to the generated configuration zip file.

The main steps of this pipeline are:

ramutils.pipelines.report.make_report(subject, experiment, paths, joint_report=False, retrain=False, stim_params=None, exp_params=None, sessions=None, vispath=None, rerun=False, trigger_electrode=None, use_classifier_excluded_leads=False, pipeline_name='report')[source]

Constructs a report and saves out all the necessary data to re-construct the report

This pipeline should be used for generating single session reports for both record-only and stimulation sessions. However, the current pipeline also support combining sessions of record-only experiments into a single report. In the future, this capability may be moved to ramutils.pipelines.aggregated_report.make_aggregated_report since that is a more natural location

Parameters:
  • subject (str) – Subject ID
  • experiment (str) – Experiment to generate report for
  • paths (FilePaths) –
  • joint_report (Bool) – If True, catFR/FR sessions will be combined in the report
  • retrain (Bool) – If True, retrain classifier rather than trying to load from disk
  • stim_params (List[StimParameters]) – Stimulation parameters (empty list for non-stim experiments).
  • exp_params (ExperimentParameters) – When given, overrides the inferred default parameters to use for an experiment.
  • sessions (list or None) – For reports that span sessions, sessions to read data from. When not given, all available sessions are used for reports.
  • vispath (str) – Filename for task graph visualization.
  • rerun (bool) – If True, do not attempt to load data from long-term storage. If any necessary data is not found, everything will be rerun
  • trigger_electrode (str) – The label for the bipolar pair to be used for triggering stimulation in PS5
  • use_classifier_excluded_leads (bool) – Use contents of classifier_excluded_leads.txt to exclude channels from classifier training
  • pipeline_name (str) – Name to use for status updates.
Returns:

report_path – Path to generated report.

Return type:

str

Notes

Eventually this will return an object that summarizes all output of the report rather than the report itself.

The main steps of this pipeline are:

ramutils.pipelines.aggregated_report.make_aggregated_report(subjects=None, experiments=None, sessions=None, fit_model=True, paths=None, pipeline_name='aggregate')[source]

Build an aggregated stim session report

This pipeline should be used for combining data across stim experiment sessions into a single report. The concept of a “joint report” already exists for record-only sessions and can be generated using the ramutils.pipelines.report.make_report pipeline. In the future, a more sensible approach would be to have joint reports for both stim sessions and record-only sessions be built using the same pipeline.

Keyword Arguments:
 
  • subjects (list or None) – The set of subjects to include when building the report. If None and one or more experiments are specified, then the subjects who completed each experiment will be identified automatically.
  • experiments (list or None) – The set of experiments to include when building the report. This is primarily useful for combining FR with CatFR session data, effectively a joint report for stim sessions. However, it is also possible to combine across experiment series. For example, a joint report could be built that combines FR3, catFR3, FR5, and catFR5. This is possible because the report templates for these experiments are identical. It is not, however, possible to combine stim reports across dissimilar reports. For example, it would not make sense to build an aggregate report combining PS5 with catFR5 since those use completely different templates.
  • sessions (list or None) – The set of sessions to include. This parameter can only be used if a single subject and a single experiment have been provided. The main use case is for generating a stim report that excludes 1 or more sessions. We do not currently support the ability to combine the sessions paramter with more than one subject or more than one experiment. This could be a future enhancement. For example, it may be useful to be able to generate an aggregated report of all the first sessions of a particular experiment type, or all first sessions for a particular subject.
  • fit_model (bool) – If true, the a Bayesian hierachical multilvel model will be fit using the data combined across the requested subjects, experiments, and sessions. This process can be very slow as the number of sessions increases, so it is False by default. The main use case if for building a stim report that aggregates over the sessions that a particular subject completed of a particular experiment.
  • paths (ramutils.parameters.FilePaths) – Helper class for setting up the set of paths that will be necessary for loading existing results
  • pipeline_name (str) – Name to use for status updates.

Tasks

Defining tasks

Tasks are created by using the ramutils.tasks.task() decorator or wrapping a function with ramutils.tasks.make_task(). These simply apply the :func:`dask.delayed`_ and (optionally) :module:`joblib`_ caching decorators. The former is important for adding the ability to parallelize a pipeline (for tasks that can run independently) while the latter allows for resuming a pipeline when something goes wrong or if only changing one parameter which does not affect all tasks.

ramutils.tasks.task(cache=True, log_args=False, nout=None)[source]

Decorator to define a task.

Keyword Arguments:
 
  • cache (bool) – Cache the task result (default: True)
  • log_args (bool) – Log arguments the task is called with (default: False)
  • nout (int) – Number of return values of the wrapped function. Must be specified if more than 1.
ramutils.tasks.make_task(func, *args, **kwargs)[source]

Wrap a function in a task.

Parameters:
  • func (callable) – Function to wrap
  • args – Arguments for the function
  • kwargs – Keyword arugments for the function plus keyword arguments accepted by the task() decorator.

Reference

Common tasks come predefined in the ramutils.tasks package and are documented below.

Classifier tasks

ramutils.tasks.classifier.summarize_classifier(classifier, pow_mat, events, n_permutations, tag='classifier', **kwargs)[source]

Perform LOSO or LOLO cross validation on a classifier.

Parameters:
  • classifier (sklearn model object) –
  • pow_mat (np.ndarray) –
  • events (np.recarray) –
  • n_permutations (int) –
  • tag (str) – Tag to assign the resulting classifier summary (default: 'classifier')
  • kwargs (dict) – Extra keyword arguments that are passed to get_sample_weights. See that function for more details
Returns:

classifier_summary – Results of cross validation as a summary object

Return type:

ClassifierSummary

ramutils.tasks.classifier.serialize_classifier(classifier, pairs, features, events, sample_weights, classifier_summary, subject)[source]

Serialize classifier into a container object

Parameters:
  • classifier (sklearn Estimator) – Model used during training
  • pairs (array_like) – bipolar pairs used for training
  • features (np.ndarray) – Normalized power matrix used as features to the classifier
  • events (np.recarray) – Set of events used for training
  • sample_weights (array_like) – Weights used for each of the event
  • classifier_summary (ClassifierSummary) – Object used for calculating and storing cross-validation-related metrics
  • subject (str) – Subject identifier
Returns:

Object representing all meta-data associated with training a classifier

Return type:

ClassififerContainer

ramutils.tasks.classifier.post_hoc_classifier_evaluation(events, powers, all_pairs, classifiers, n_permutations, retrained_classifier, use_retrained=False, post_stim_events=None, post_stim_powers=None, **kwargs)[source]

Evaluate a trained classifier

Parameters:
  • events (np.recarray) – Task events associated with the stim sessesion to be evaluated
  • powers (np.ndarray) – Normalized mean powers
  • all_pairs (OrderedDict) – All pairs based on recorded electrodes combine from config file
  • classifiers (List) – List of classifiers corresponding to each session
  • n_permutations (int) – Number of permutations to use for cross validation
  • retrained_classifier (classiflib.container.ClassifierContainer) – classifier container object based on a retrained classifier
  • use_retrained (bool (default False)) – Indicates if the retrained classifier should be used over the actual classifier for the purpose of evaluation
  • post_stim_events (np.recarray or None) – Post-stimulation events associated with the stim sessesion to be evaluated. Can be done in the case of FR2 where post stim events
  • post_stim_powers (np.ndarray or None) – Normalized mean powers for post_stim period events
Returns:

A dictionary of summary objects that are needed in subsequent parts of the processing pipeline. The dictionary will be in the following format:

{
    'cross_session_summary': MultiSessionClassifierSummary,
    'classifier_summaries': List of ClassifierSummary objects,
    'encoding_classifier_summaries': List of ClassifierSummary
    objects built using all encoding events,
    'post_stim_predicted_probs': Classifier output during post stim period
}

Return type:

dict

Notes

Different channels could be excluded based on results of artifact detection and stim parameters. Extract the used pairs from the serialized classifier that was used/retrained in order to correctly assess the classifier. The default behavior is to use the retrained classifier for any sessions where the actual classifier was not found or was unable to be loaded. Legacy-formatted classifiers are not supported for re-loading. In cases where a stim session was restarted, the default behavior is to use the original classifier (i.e. the classifier before artifact detection) rather than trying to guess which classifier to load.

ramutils.tasks.classifier.reload_used_classifiers(subject, experiment, events, root)[source]

Reload the actual classifiers used in each session of an experiment

Parameters:
  • subject (str) – Subject identifier
  • experiment (str) – Name of the experiment
  • sessions (list) – List of sessions to try reloading a classifier
  • root (str) – Base path of where to find RHINO files
Returns:

List of ClassifierContainer objects of length n_sessions

Return type:

list

Notes

If a classifier is not found or is unable to be reloaded (legacy storage format, or other issues), then the list of ClassifierContainer objects will have None as the entry for that session.

Events tasks

Partial pipelines for processing events that is used by full pipelines.

ramutils.tasks.events.build_test_data(subject, experiment, paths, joint_report, sessions=None, **kwargs)[source]

Construct the set of events to be used for post-hoc classifier evaluation, i.e. the test data

ramutils.tasks.events.build_training_data(subject, experiment, paths, sessions=None, **kwargs)[source]

Construct the set of events needed for classifier training

Miscellaneous tasks

ramutils.tasks.misc.read_index(mount_point='/')[source]

Reads the JSON index reader.

Parameters:mount_point (str) – Root directory to search for.
Returns:JsonIndexReader
ramutils.tasks.misc.save_all_output(subject, experiment, session_summaries, math_summaries, classifier_evaluation_results, save_location, retrained_classifier=None, target_selection_table=None, behavioral_results=None, agg_report=False)[source]

Save all required output necessary to re-generate a report

subject: str
Subject ID
experiment: str
Experiment name
session_summaries: List
List of SessionSummary derived objects
math_summaries: List
List of MathSummary objects
classifier_evaluation_results: List
List of ClassifierSummary objects
save_location: str
Destination for data to be saved. Typically in /data10/RAM/report_database/ on RHINO
retrained_classifier: ClassifierContainer
Serialized representation of the retrained classifier
target_selection_table pd.DataFrame
DataFrame representation of the target selection table, formerly known as the subsequent memory effect table
behavioral_results: dict
Keys are the behavioral effect model type (stim list, stim item, etc.) and values are the traces from estimating those models
Returns:results_files – Dictionary whose keys are the names of statically-produced plots and values are encoded versions of those images. These are used to embed the static plots in the html reports during report generation
Return type:dict

Notes

All output files are of the format {subject}_{experiment}_{session}_{data_type}.{file_type} where data_type is a generic name for the type of data being saved. The following data types map to a summary object:

ramutils.tasks.misc.load_existing_results(subject, experiment, sessions, stim_report, db_loc, joint_report, rootdir='/')[source]

Load previously-saved data creating during report generation

subject: str
Subject ID
experiment: str
Experiment ID
sessions: list or None
If none, then sessions are looked up from r1.json for the given subject and experiment.
stim_report: bool
Indicator for if the requested data is associated with a stim report
db_loc: str
Report database location relative to rootdir. db_loc will be appended to rootdir to find the full absolute path. If both db_loc and rootdir are absolute paths, it will be assumed that db_loc contains the root directory.
rootdir: str
RHINO mount point or root directory
saved_results: dict
Mirrors the input to save_all_output

Montage tasks

ramutils.tasks.montage.generate_pairs_for_classifier(pairs, excluded_pairs)[source]

Create recarray of electrode pairs for the classifier container

Parameters:
  • pairs – JSON-format object containing all electrode pairs in the montage
  • excluded_pairs – array-like containing pairs excluded from the montage
Returns:

recarray containing all pairs minus excluded pairs

Return type:

np.recarray

ramutils.tasks.montage.reduce_pairs(pairs, stim_params, return_excluded=False)[source]

Remove stim pairs from the pairs.json dict.

Parameters:
  • pairs (OrderedDict) – Full pairs.json as a dict
  • stim_params (List[StimParameters]) –
  • return_excluded (bool) – Whether excluded pairs should be returned instead of reduced pairs
Returns:

pairs with stim pairs removed, or removed pairs if return_excluded is True

Return type:

OrderedDict

ramutils.tasks.montage.get_used_pair_mask(all_pairs, excluded_pairs)[source]

Create a boolean mask indicating which electrodes should be included in classifier training/evaluation.

Parameters:
  • all_pairs (OrderedDict) –
  • excluded_pairs (OrderedDict) –
Returns:

mask – Boolean mask of channels to include.

Return type:

List[bool]

ramutils.tasks.montage.generate_montage_metadata_table(subject, experiment, sessions, all_pairs, root='/')

Create a dataframe containing atlas labels, locations, and coordinates

Parameters:
  • subject (str) – Subject ID
  • experiment (str) – Experiment
  • all_pairs (OrderedDict) – Full set of bipolar pairs that will be augmented with their metadata
  • root (str) – Base path for RHINO
ramutils.tasks.montage.get_pairs(subject_id, experiment, sessions, paths)[source]

Determine how we should figure out what pairs to use.

Option 1: In the case of hardware bipolar recordings with the ENS, EEG data is stored in the HDF5 file which contains the Odin electrode config data so we can use this.

Option 2: For monopolar recordings, we can just read the pairs.json from localization.

Parameters:
  • subject_id (str) – Subject ID
  • experiment (str) – Experiment type
  • sessions (list) – List of sessions to use
  • paths (FilePaths) – Object for storing important file paths
  • localization (int) – Localization number
  • montage (int) – Montage number
Returns:

all_pairs – All pairs used in the experiment.

Return type:

dict

Notes

This should only be used for getting pairs when building a report. For config generation, use generate_pairs_from_electrode_config. To use get_pairs, you would need to determine an open loop experiment that the subject completed and use that experiment instead of the experiment whose config file is being generated.

ramutils.tasks.montage.get_classifier_excluded_leads(subject, all_pairs, rootdir='/')[source]

Identify channels to be excluded using the classifier_excluded_leads.txt file

subject: str
Subject identifier
paths: FilePaths
FilePaths object including RHINO root directory

excluded_contacts: List of contacts in the same format as what is returned by make_stim_params

ramutils.tasks.montage.get_artifact_tstats(stim_events, pairs, start_time, duration, return_pvalues=False, before_experiment=True)

Computes ttest on the average EEG value pre-stim vs post-stim. TODO: import from artdet; define parameters centrally

Parameters:
  • stim_events (np.rec.array) – Stimulation events for a session
  • pairs (dict) – bipolar pairs
  • start_time (float) – time after stim offset/before stim onset to begin (seconds)
  • duration (float) – Length of eeg to evaluate (seconds)
  • return_pvalues (bool) – If true, return p-values along with t-statistics
  • before_experiment – If true, only include stim events before the first list
Returns:

  • t (np.ndarray) – T-statistics by channel
  • p (np.ndarray) – p-values by channel

Odin/Ramulator tasks

Tasks specific to the Medtronic Odin ENS.

ramutils.tasks.odin.generate_electrode_config(subject, paths, anodes=None, cathodes=None, localization=0, montage=0, default_surface_area=0.001, use_common_reference=False)[source]

Generate electrode configuration files (CSV and binary).

Parameters:
  • subject (str) – Subjebct ID
  • paths (FilePaths) –
  • anodes (List[str]) – List of stim anode labels.
  • cathodes (List[str]) – List of stim cathode labels.
  • localization (int) – Localization number (default: 0)
  • montage (int) – Montage number (default: 0)
  • default_surface_area (float) – Default surface area to set all electrodes to in mm^2. Only used if no area file can be found.
  • use_common_reference (bool) – Use common reference instead of bipolar referencing scheme.
Returns:

paths – Updated FilePaths object with path to the electrode config file defined.

Return type:

FilePaths

Notes

At present, this will only allow for generating hardware-bipolar electrode config files.

ramutils.tasks.odin.generate_ramulator_config(subject, experiment, container, stim_params, paths, pairs=None, excluded_pairs=None, exp_params=None, extended_blanking=True, trigger_pairs=None)[source]

Create configuration files for Ramulator.

In hardware bipolar mode, the neurorad pipeline generates a pairs.json file that differs from the electrode configured pairs. It is up to the user of the pipeline to ensure that the path to the correct pairs.json is supplied (although Ramulator does not use it in this case).

Parameters:
  • subject (str) –
  • experiment (str) –
  • container (ClassifierContainer or None) – serialized classifier
  • stim_params (List[StimParameters]) – list of stimulation parameters
  • paths (FilePaths) –
  • excluded_pairs (dict) – Pairs excluded from the classifier (pairs that contain a stim contact and possibly some others)
  • exp_params (ExperimentParameters) – All parameters used in training the classifier. This is partially redundant with some data stored in the container object.
  • extended_blanking (bool) – Whether or not to enable the ENS extended blanking (default: True).
  • trigger_pairs (List[str] or None) – Pairs to be used for triggering stim in PS5.
Returns:

zip_path – Path to generated configuration zip file

Return type:

str

Power computation tasks

ramutils.tasks.powers.reduce_powers(powers, channel_mask, n_frequencies, frequency_mask=None)[source]

Create a subset of the full power matrix by excluding certain electrodes

Parameters:
  • powers (np.ndarray) – Original power matrix
  • channel_mask (array_like) – Boolean array of size n_channels
  • n_frequencies (int) – Number of frequencies used in calculating the power matrix. This is needed to be able to properly reshape the array
  • frequency_mask (array_like) – Boolean array of size n_frequencies
Returns:

Subsetted power matrix

Return type:

np.ndarray

ramutils.tasks.powers.compute_normalized_powers(events, **kwargs)[source]

Compute powers by session, encoding/retrieval, and FR vs. PAL

Notes

There are different start times, end time, and buffer times for each subset type, so those are passed in as kwargs and looked up prior to calling the more general compute_powers function

ramutils.tasks.powers.get_trigger_frequency_mask(trigger_frequency, frequencies)[source]

Returns a boolean mask identifying a single frequency in a list of frequencies

Report summary tasks

Tasks related to summarizing an experiment. Used primarily in reporting results.

ramutils.tasks.summary.summarize_nonstim_sessions(all_events, task_events, bipolar_pairs, excluded_pairs, normalized_powers, joint=False, repetition_ratio_dict={})[source]

Generate a summary by unique session/experiment

Parameters:
  • all_events (np.recarray) – Full set of events
  • task_events (np.recarray) – Event subset used for classifier training
  • joint (Bool) – Indicator for if a joint report is being created. This will disable checks for single-experiment events
  • repetition_ratio_dict (Dict) – Mapping between subject ID and repetition ratio data
Returns:

summary – List of SessionSummary objects for the proper experiment type.

Return type:

list

Raises:

TooManyExperimentsError – If the events span more than one session.

Notes

The experiment type is inferred from the events.

ramutils.tasks.summary.summarize_math(events, joint=False)[source]

Generate a summary math event summary of a single experiment session

Parameters:
  • events (np.recarray) – Events from single experiment session
  • joint (Bool) – Indicates if the given events are part of a joint event, and therefore multiple experiments should be allowed
Returns:

summary – List of MathSummary objects

Return type:

list

ramutils.tasks.summary.summarize_stim_sessions(all_events, task_events, stim_params, pairs_data, bipolar_pairs, excluded_pairs, normalized_powers, encoding_classifier_summaries=None, post_stim_predicted_probs=None, trigger_output=None, post_stim_trigger_output=None, post_stim_eeg=None)[source]

Construct stim session summaries

ramutils.tasks.summary.summarize_ps_sessions(ps_events, bipolar_pairs, excluded_pairs)[source]

Task for generating summaries of PS session

Parameters:
  • ps_events (np.recarray) –
  • bipolar_pairs (dict) –
  • excluded_pairs (dict) –