• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / ibllib / 7961675356254463

pending completion
7961675356254463

Pull #557

continuous-integration/UCL

olivier
add test
Pull Request #557: Chained protocols

718 of 718 new or added lines in 27 files covered. (100.0%)

12554 of 18072 relevant lines covered (69.47%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.78
/ibllib/pipes/dynamic_pipeline.py
1
import logging
1✔
2
import re
1✔
3
from collections import OrderedDict
1✔
4
from pathlib import Path
1✔
5
from itertools import chain
1✔
6
import yaml
1✔
7

8
import spikeglx
1✔
9

10
import ibllib.io.session_params as sess_params
1✔
11
import ibllib.io.extractors.base
1✔
12
import ibllib.pipes.ephys_preprocessing as epp
1✔
13
import ibllib.pipes.tasks as mtasks
1✔
14
import ibllib.pipes.base_tasks as bstasks
1✔
15
import ibllib.pipes.widefield_tasks as wtasks
1✔
16
import ibllib.pipes.sync_tasks as stasks
1✔
17
import ibllib.pipes.behavior_tasks as btasks
1✔
18
import ibllib.pipes.video_tasks as vtasks
1✔
19
import ibllib.pipes.ephys_tasks as etasks
1✔
20
import ibllib.pipes.audio_tasks as atasks
1✔
21
from ibllib.pipes.photometry_tasks import TaskFibrePhotometryPreprocess, TaskFibrePhotometryRegisterRaw
1✔
22

23
_logger = logging.getLogger(__name__)
1✔
24

25

26
def acquisition_description_legacy_session(session_path, save=False):
1✔
27
    """
28
    From a legacy session create a dictionary corresponding to the acquisition description
29
    :return: dict
30
    """
31
    extractor_type = ibllib.io.extractors.base.get_session_extractor_type(session_path=session_path)
1✔
32
    etype2protocol = dict(biased='choice_world_biased', habituation='choice_world_habituation',
1✔
33
                          training='choice_world_training', ephys='choice_world_recording')
34
    dict_ad = get_acquisition_description(etype2protocol[extractor_type])
1✔
35
    if save:
1✔
36
        sess_params.write_params(session_path=session_path, data=dict_ad)
1✔
37
    return dict_ad
1✔
38

39

40
def get_acquisition_description(protocol):
1✔
41
    """"
42
    This is a set of example acquisition descriptions for experiments
43
    -   choice_world_recording
44
    -   choice_world_biased
45
    -   choice_world_training
46
    -   choice_world_habituation
47
    -   choice_world_passive
48
    That are part of the IBL pipeline
49
    """
50
    if protocol == 'choice_world_recording':   # canonical ephys
1✔
51
        devices = {
1✔
52
            'cameras': {
53
                'right': {'collection': 'raw_video_data', 'sync_label': 'audio'},
54
                'body': {'collection': 'raw_video_data', 'sync_label': 'audio'},
55
                'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
56
            },
57
            'neuropixel': {
58
                'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'},
59
                'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'}
60
            },
61
            'microphone': {
62
                'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
63
            },
64
        }
65
        acquisition_description = {  # this is the current ephys pipeline description
1✔
66
            'devices': devices,
67
            'tasks': [
68
                {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}},
69
                {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}}
70
            ],
71
            'sync': {
72
                'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'}
73
            },
74
            'procedures': ['Ephys recording with acute probe(s)'],
75
            'projects': ['ibl_neuropixel_brainwide_01']
76
        }
77
    else:
78
        devices = {
1✔
79
            'cameras': {
80
                'left': {'collection': 'raw_video_data', 'sync_label': 'frame2ttl'},
81
            },
82
            'microphone': {
83
                'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
84
            },
85
        }
86
        acquisition_description = {  # this is the current ephys pipeline description
1✔
87
            'devices': devices,
88
            'sync': {
89
                'bpod': {'collection': 'raw_behavior_data', 'extension': 'bin'}
90
            },
91
            'procedures': ['Behavior training/tasks'],
92
            'projects': ['ibl_neuropixel_brainwide_01']
93
        }
94
        if protocol == 'choice_world_biased':
1✔
95
            key = 'biasedChoiceWorld'
1✔
96
        elif protocol == 'choice_world_training':
1✔
97
            key = 'trainingChoiceWorld'
1✔
98
        elif protocol == 'choice_world_habituation':
×
99
            key = 'habituationChoiceWorld'
×
100
        else:
101
            raise ValueError(f'Unknown protocol "{protocol}"')
×
102
        acquisition_description['tasks'] = [{key: {
1✔
103
            'collection': 'raw_behavior_data',
104
            'sync_label': 'bpod', 'main': True
105
        }}]
106
    acquisition_description['version'] = '1.0.0'
1✔
107
    return acquisition_description
1✔
108

109

110
def make_pipeline(session_path, **pkwargs):
1✔
111
    """
112
    Creates a pipeline of extractor tasks from a session's experiment description file.
113

114
    Parameters
115
    ----------
116
    session_path : str, Path
117
        The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'.
118
    **pkwargs
119
        Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor.
120

121
    Returns
122
    -------
123
    ibllib.pipes.tasks.Pipeline
124
        A task pipeline object.
125
    """
126
    # NB: this pattern is a pattern for dynamic class creation
127
    # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path)
128
    if not session_path or not (session_path := Path(session_path)).exists():
1✔
129
        raise ValueError('Session path does not exist')
×
130
    tasks = OrderedDict()
1✔
131
    acquisition_description = sess_params.read_params(session_path)
1✔
132
    devices = acquisition_description.get('devices', {})
1✔
133
    kwargs = {'session_path': session_path}
1✔
134

135
    # Registers the experiment description file
136
    tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw',
1✔
137
                                                     (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs)
138

139
    # Syncing tasks
140
    (sync, sync_args), = acquisition_description['sync'].items()
1✔
141
    sync_args['sync_collection'] = sync_args.pop('collection')  # rename the key so it matches task run arguments
1✔
142
    sync_args['sync_ext'] = sync_args.pop('extension')
1✔
143
    sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None)
1✔
144
    sync_kwargs = {'sync': sync, **sync_args}
1✔
145
    sync_tasks = []
1✔
146
    if sync == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data':
1✔
147
        tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs)
1✔
148
        tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})(
1✔
149
            **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
150
        sync_tasks = [tasks[f'SyncPulses_{sync}']]
1✔
151
    elif sync == 'nidq':
1✔
152
        tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs)
1✔
153
        tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})(
1✔
154
            **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
155
        sync_tasks = [tasks[f'SyncPulses_{sync}']]
1✔
156
    elif sync == 'tdms':
1✔
157
        tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs)
×
158
    elif sync == 'bpod':
1✔
159
        pass
160
        # ATM we don't have anything for this not sure it will be needed in the future
161

162
    # Behavior tasks
163
    task_protocols = acquisition_description.get('tasks', [])
1✔
164
    for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))):
1✔
165
        collection = task_info.get('collection', f'raw_task_data_{i:02}')
1✔
166
        task_kwargs = {'protocol': protocol, 'collection': collection}
1✔
167
        # For now the order of protocols in the list will take precedence. If collections are numbered,
168
        # check that the numbers match the order.  This may change in the future.
169
        if re.match(r'^raw_task_data_\d{2}$', collection):
1✔
170
            task_kwargs['protocol_number'] = i
1✔
171
            if int(collection.split('_')[-1]) != i:
1✔
172
                _logger.warning('Number in collection name does not match task order')
×
173
        if extractors := task_info.get('extractors', False):
1✔
174
            extractors = (extractors,) if isinstance(extractors, str) else extractors
1✔
175
            task_name = None  # to avoid unbound variable issue in the first round
1✔
176
            for j, task in enumerate(extractors):
1✔
177
                # Assume previous task in the list is parent
178
                parents = [] if j == 0 else [tasks[task_name]]
1✔
179
                # Make sure extractor and sync task don't collide
180
                for sync_option in ['nidq', 'bpod']:
1✔
181
                    if sync_option in task.lower() and not sync == sync_option:
1✔
182
                        raise ValueError(f'Extractor "{task}" and sync "{sync}" do not match')
1✔
183
                try:
1✔
184
                    task = getattr(btasks, task)
1✔
185
                except AttributeError:
×
186
                    ...  # TODO Attempt to import from personal project repo
×
187
                # Rename the class to something more informative
188
                task_name = f'{task.__name__}_{i:02}'
1✔
189
                # For now we assume that the second task in the list is always the trials extractor, which is dependent
190
                # on the sync task and sync arguments
191
                if j == 1:
1✔
192
                    tasks[task_name] = type(task_name, (task,), {})(
1✔
193
                        **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks
194
                    )
195
                else:
196
                    tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents)
1✔
197
                # For the next task, we assume that the previous task is the parent
198
        else:  # Legacy block to handle sessions without defined extractors
199
            # -   choice_world_recording
200
            # -   choice_world_biased
201
            # -   choice_world_training
202
            # -   choice_world_habituation
203
            if 'habituation' in protocol:
1✔
204
                registration_class = btasks.HabituationRegisterRaw
1✔
205
                behaviour_class = btasks.HabituationTrialsBpod
1✔
206
                compute_status = False
1✔
207
            elif 'passiveChoiceWorld' in protocol:
1✔
208
                registration_class = btasks.PassiveRegisterRaw
1✔
209
                behaviour_class = btasks.PassiveTask
1✔
210
                compute_status = False
1✔
211
            elif sync_kwargs['sync'] == 'bpod':
1✔
212
                registration_class = btasks.TrialRegisterRaw
1✔
213
                behaviour_class = btasks.ChoiceWorldTrialsBpod
1✔
214
                compute_status = True
1✔
215
            elif sync_kwargs['sync'] == 'nidq':
1✔
216
                registration_class = btasks.TrialRegisterRaw
1✔
217
                behaviour_class = btasks.ChoiceWorldTrialsNidq
1✔
218
                compute_status = True
1✔
219
            else:
220
                raise NotImplementedError
×
221
            tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})(
1✔
222
                **kwargs, **task_kwargs)
223
            parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks
1✔
224
            tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})(
1✔
225
                **kwargs, **sync_kwargs, **task_kwargs, parents=parents)
226
            if compute_status:
1✔
227
                tasks[f"TrainingStatus_{protocol}_{i:02}"] = type(f'TrainingStatus_{protocol}_{i:02}', (
1✔
228
                    btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']])
229

230
    # Ephys tasks
231
    if 'neuropixel' in devices:
1✔
232
        ephys_kwargs = {'device_collection': 'raw_ephys_data'}
1✔
233
        tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs)
1✔
234

235
        all_probes = []
1✔
236
        register_tasks = []
1✔
237
        for pname, probe_info in devices['neuropixel'].items():
1✔
238
            meta_file = spikeglx.glob_ephys_files(Path(session_path).joinpath(probe_info['collection']), ext='meta')
1✔
239
            meta_file = meta_file[0].get('ap')
1✔
240
            nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file))
1✔
241
            nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file))
1✔
242

243
            if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1):
1✔
244
                tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})(
×
245
                    **kwargs, **ephys_kwargs, pname=pname)
246
                all_probes.append(pname)
×
247
                register_tasks.append(tasks[f'EphyCompressNP21_{pname}'])
×
248
            elif nptype == 'NP2.4' and nshanks > 1:
1✔
249
                tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})(
1✔
250
                    **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks)
251
                register_tasks.append(tasks[f'EphyCompressNP24_{pname}'])
1✔
252
                all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)]
1✔
253
            else:
254
                tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})(
1✔
255
                    **kwargs, **ephys_kwargs, pname=pname)
256
                register_tasks.append(tasks[f'EphysCompressNP1_{pname}'])
1✔
257
                all_probes.append(pname)
1✔
258

259
        if nptype == '3A':
1✔
260
            tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})(
1✔
261
                **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks)
262

263
        for pname in all_probes:
1✔
264
            register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name]
1✔
265

266
            if nptype != '3A':
1✔
267
                tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})(
1✔
268
                    **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks)
269
                tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})(
1✔
270
                    **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']])
271
            else:
272
                tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})(
1✔
273
                    **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']])
274

275
            tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})(
1✔
276
                **kwargs, **ephys_kwargs, pname=pname, parents=register_task)
277
            tasks[f'EphysCellQC_{pname}'] = type(f'EphysCellQC_{pname}', (etasks.EphysCellsQc,), {})(
1✔
278
                **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'Spikesorting_{pname}']])
279

280
    # Video tasks
281
    if 'cameras' in devices:
1✔
282
        video_kwargs = {'device_collection': 'raw_video_data',
1✔
283
                        'cameras': list(devices['cameras'].keys())}
284
        video_compressed = sess_params.get_video_compressed(acquisition_description)
1✔
285

286
        if video_compressed:
1✔
287
            # This is for widefield case where the video is already compressed
288
            tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(
1✔
289
                **kwargs, **video_kwargs)
290
            dlc_parent_task = tasks['VideoConvert']
1✔
291
            tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})(
1✔
292
                **kwargs, **video_kwargs, **sync_kwargs)
293
        else:
294
            tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})(
1✔
295
                **kwargs, **video_kwargs)
296
            tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})(
1✔
297
                **kwargs, **video_kwargs, **sync_kwargs)
298
            dlc_parent_task = tasks['VideoCompress']
1✔
299
            if sync == 'bpod':
1✔
300
                tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})(
1✔
301
                    **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']])
302
            elif sync == 'nidq':
1✔
303
                tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})(
1✔
304
                    **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks)
305

306
        if len(video_kwargs['cameras']) == 3:
1✔
307
            tasks[tn] = type((tn := 'DLC'), (epp.EphysDLC,), {})(
1✔
308
                **kwargs, parents=[dlc_parent_task])
309
            tasks['PostDLC'] = type('PostDLC', (epp.EphysPostDLC,), {})(
1✔
310
                **kwargs, parents=[tasks['DLC'], tasks[f'VideoSyncQC_{sync}']])
311

312
    # Audio tasks
313
    if 'microphone' in devices:
1✔
314
        (microphone, micro_kwargs), = devices['microphone'].items()
1✔
315
        micro_kwargs['device_collection'] = micro_kwargs.pop('collection')
1✔
316
        if sync_kwargs['sync'] == 'bpod':
1✔
317
            tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})(
1✔
318
                **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection'])
319
        elif sync_kwargs['sync'] == 'nidq':
1✔
320
            tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs)
1✔
321

322
    # Widefield tasks
323
    if 'widefield' in devices:
1✔
324
        (_, wfield_kwargs), = devices['widefield'].items()
1✔
325
        wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection')
1✔
326
        tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})(
1✔
327
            **kwargs, **wfield_kwargs)
328
        tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})(
1✔
329
            **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']])
330
        tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})(
1✔
331
            **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']])
332
        tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})(
1✔
333
            **kwargs, **wfield_kwargs, **sync_kwargs,
334
            parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks)
335
        tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})(
1✔
336
            **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']])
337

338
    if 'photometry' in devices:
1✔
339
        # {'collection': 'raw_photometry_data', 'sync_label': 'frame_trigger', 'regions': ['Region1G', 'Region3G']}
340
        photometry_kwargs = devices['photometry']
1✔
341
        tasks['TaskFibrePhotometryRegisterRaw'] = type('TaskFibrePhotometryRegisterRaw', (
1✔
342
            TaskFibrePhotometryRegisterRaw,), {})(**kwargs, **photometry_kwargs)
343
        tasks['TaskFibrePhotometryPreprocess'] = type('TaskFibrePhotometryPreprocess', (
1✔
344
            TaskFibrePhotometryPreprocess,), {})(**kwargs, **photometry_kwargs, **sync_kwargs,
345
                                                 parents=[tasks['TaskFibrePhotometryRegisterRaw']] + sync_tasks)
346

347
    p = mtasks.Pipeline(session_path=session_path, **pkwargs)
1✔
348
    p.tasks = tasks
1✔
349
    return p
1✔
350

351

352
def make_pipeline_dict(pipeline, save=True):
1✔
353
    task_dicts = pipeline.create_tasks_list_from_pipeline()
1✔
354
    # TODO better name
355
    if save:
1✔
356
        with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file:
×
357
            _ = yaml.dump(task_dicts, file)
×
358
    return task_dicts
1✔
359

360

361
def load_pipeline_dict(path):
1✔
362
    with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file:
1✔
363
        task_list = yaml.full_load(file)
1✔
364

365
    return task_list
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc