• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / ibllib / 1761696499260742

05 Oct 2023 09:46AM UTC coverage: 55.27% (-1.4%) from 56.628%
1761696499260742

Pull #655

continuous-integration/UCL

bimac
add @sleepless decorator
Pull Request #655: add @sleepless decorator

21 of 21 new or added lines in 1 file covered. (100.0%)

10330 of 18690 relevant lines covered (55.27%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.85
/ibllib/io/extractors/base.py
1
"""Base Extractor classes.
1✔
2
A module for the base Extractor classes.  The Extractor, given a session path, will extract the
3
processed data from raw hardware files and optionally save them.
4
"""
5

6
import abc
1✔
7
from collections import OrderedDict
1✔
8
import json
1✔
9
from pathlib import Path
1✔
10

11
import numpy as np
1✔
12
import pandas as pd
1✔
13
from one.alf.files import get_session_path
1✔
14
from ibllib.io import raw_data_loaders as raw
1✔
15
from ibllib.io.raw_data_loaders import load_settings, _logger
1✔
16

17

18
class BaseExtractor(abc.ABC):
1✔
19
    """
1✔
20
    Base extractor class
21
    Writing an extractor checklist:
22
    -   on the child class, overload the _extract method
23
    -   this method should output one or several numpy.arrays or dataframe with a consistent shape
24
    -   save_names is a list or a string of filenames, there should be one per dataset
25
    -   set save_names to None for a dataset that doesn't need saving (could be set dynamically
26
    in the _extract method)
27
    :param session_path: Absolute path of session folder
28
    :type session_path: str/Path
29
    """
30

31
    session_path = None
1✔
32
    save_names = None
1✔
33
    var_names = None
1✔
34
    default_path = Path('alf')  # relative to session
1✔
35

36
    def __init__(self, session_path=None):
1✔
37
        # If session_path is None Path(session_path) will fail
38
        self.session_path = Path(session_path)
1✔
39

40
    def extract(self, save=False, path_out=None, **kwargs):
1✔
41
        """
42
        :return: dict of numpy.array, list of filenames
43
        """
44
        out = self._extract(**kwargs)
1✔
45
        files = self._save(out, path_out=path_out) if save else None
1✔
46
        return out, files
1✔
47

48
    def _save(self, data, path_out=None):
1✔
49
        # Check if self.save_names is of the same length of out
50
        if not path_out:
1✔
51
            path_out = self.session_path.joinpath(self.default_path)
1✔
52

53
        def _write_to_disk(file_path, data):
1✔
54
            """Implements different save calls depending on file extension.
55

56
            Parameters
57
            ----------
58
            file_path : pathlib.Path
59
                The location to save the data.
60
            data : pandas.DataFrame, numpy.ndarray
61
                The data to save
62

63
            """
64
            csv_separators = {
1✔
65
                ".csv": ",",
66
                ".ssv": " ",
67
                ".tsv": "\t"
68
            }
69
            # Ensure empty files are not created; we expect all datasets to have a non-zero size
70
            if getattr(data, 'size', len(data)) == 0:
1✔
71
                filename = file_path.relative_to(self.session_path).as_posix()
1✔
72
                raise ValueError(f'Data for {filename} appears to be empty')
1✔
73
            file_path = Path(file_path)
1✔
74
            file_path.parent.mkdir(exist_ok=True, parents=True)
1✔
75
            if file_path.suffix == ".npy":
1✔
76
                np.save(file_path, data)
1✔
77
            elif file_path.suffix in [".parquet", ".pqt"]:
1✔
78
                if not isinstance(data, pd.DataFrame):
1✔
79
                    _logger.error("Data is not a panda's DataFrame object")
×
80
                    raise TypeError("Data is not a panda's DataFrame object")
×
81
                data.to_parquet(file_path)
1✔
82
            elif file_path.suffix in csv_separators:
1✔
83
                sep = csv_separators[file_path.suffix]
1✔
84
                data.to_csv(file_path, sep=sep)
1✔
85
                # np.savetxt(file_path, data, delimiter=sep)
86
            else:
87
                _logger.error(f"Don't know how to save {file_path.suffix} files yet")
×
88

89
        if self.save_names is None:
1✔
90
            file_paths = []
×
91
        elif isinstance(self.save_names, str):
1✔
92
            file_paths = path_out.joinpath(self.save_names)
1✔
93
            _write_to_disk(file_paths, data)
1✔
94
        elif isinstance(data, dict):
1✔
95
            file_paths = []
×
96
            for var, value in data.items():
×
97
                if fn := self.save_names[self.var_names.index(var)]:
×
98
                    fpath = path_out.joinpath(fn)
×
99
                    _write_to_disk(fpath, value)
×
100
                    file_paths.append(fpath)
×
101
        else:  # Should be list or tuple...
102
            assert len(data) == len(self.save_names)
1✔
103
            file_paths = []
1✔
104
            for data, fn in zip(data, self.save_names):
1✔
105
                if fn:
1✔
106
                    fpath = path_out.joinpath(fn)
1✔
107
                    _write_to_disk(fpath, data)
1✔
108
                    file_paths.append(fpath)
1✔
109
        return file_paths
1✔
110

111
    @abc.abstractmethod
1✔
112
    def _extract(self):
1✔
113
        pass
×
114

115

116
class BaseBpodTrialsExtractor(BaseExtractor):
1✔
117
    """
1✔
118
    Base (abstract) extractor class for bpod jsonable data set
119
    Wrps the _extract private method
120

121
    :param session_path: Absolute path of session folder
122
    :type session_path: str
123
    :param bpod_trials
124
    :param settings
125
    """
126

127
    bpod_trials = None
1✔
128
    settings = None
1✔
129
    task_collection = None
1✔
130

131
    def extract(self, bpod_trials=None, settings=None, **kwargs):
1✔
132
        """
133
        :param: bpod_trials (optional) bpod trials from jsonable in a dictionary
134
        :param: settings (optional) bpod iblrig settings json file in a dictionary
135
        :param: save (bool) write output ALF files, defaults to False
136
        :param: path_out (pathlib.Path) output path (defaults to `{session_path}/alf`)
137
        :return: numpy.ndarray or list of ndarrays, list of filenames
138
        :rtype: dtype('float64')
139
        """
140
        self.bpod_trials = bpod_trials
1✔
141
        self.settings = settings
1✔
142
        self.task_collection = kwargs.pop('task_collection', 'raw_behavior_data')
1✔
143
        if self.bpod_trials is None:
1✔
144
            self.bpod_trials = raw.load_data(self.session_path, task_collection=self.task_collection)
1✔
145
        if not self.settings:
1✔
146
            self.settings = raw.load_settings(self.session_path, task_collection=self.task_collection)
1✔
147
        if self.settings is None:
1✔
148
            self.settings = {"IBLRIG_VERSION_TAG": "100.0.0"}
×
149
        elif self.settings.get("IBLRIG_VERSION_TAG", "") == "":
1✔
150
            self.settings["IBLRIG_VERSION_TAG"] = "100.0.0"
1✔
151
        return super(BaseBpodTrialsExtractor, self).extract(**kwargs)
1✔
152

153

154
def run_extractor_classes(classes, session_path=None, **kwargs):
1✔
155
    """
156
    Run a set of extractors with the same inputs
157
    :param classes: list of Extractor class
158
    :param save: True/False
159
    :param path_out: (defaults to alf path)
160
    :param kwargs: extractor arguments (session_path...)
161
    :return: dictionary of arrays, list of files
162
    """
163
    files = []
1✔
164
    outputs = OrderedDict({})
1✔
165
    assert session_path
1✔
166
    # if a single class is passed, convert as a list
167
    try:
1✔
168
        iter(classes)
1✔
169
    except TypeError:
1✔
170
        classes = [classes]
1✔
171
    for classe in classes:
1✔
172
        cls = classe(session_path=session_path)
1✔
173
        out, fil = cls.extract(**kwargs)
1✔
174
        if isinstance(fil, list):
1✔
175
            files.extend(fil)
1✔
176
        elif fil is not None:
1✔
177
            files.append(fil)
1✔
178
        if isinstance(out, dict):
1✔
179
            outputs.update(out)
1✔
180
        elif isinstance(cls.var_names, str):
1✔
181
            outputs[cls.var_names] = out
1✔
182
        else:
183
            for i, k in enumerate(cls.var_names):
1✔
184
                outputs[k] = out[i]
1✔
185
    return outputs, files
1✔
186

187

188
def _get_task_types_json_config():
1✔
189
    with open(Path(__file__).parent.joinpath('extractor_types.json')) as fp:
1✔
190
        task_types = json.load(fp)
1✔
191
    try:
1✔
192
        # look if there are custom extractor types in the personal projects repo
193
        import projects.base
1✔
194
        custom_extractors = Path(projects.base.__file__).parent.joinpath('extractor_types.json')
1✔
195
        with open(custom_extractors) as fp:
1✔
196
            custom_task_types = json.load(fp)
1✔
197
        task_types.update(custom_task_types)
1✔
198
    except (ModuleNotFoundError, FileNotFoundError):
×
199
        pass
×
200
    return task_types
1✔
201

202

203
def get_task_protocol(session_path, task_collection='raw_behavior_data'):
1✔
204
    try:
×
205
        settings = load_settings(get_session_path(session_path), task_collection=task_collection)
×
206
    except json.decoder.JSONDecodeError:
×
207
        _logger.error(f'Can\'t read settings for {session_path}')
×
208
        return
×
209
    if settings:
×
210
        return settings.get('PYBPOD_PROTOCOL', None)
×
211
    else:
212
        return
×
213

214

215
def get_task_extractor_type(task_name):
1✔
216
    """
217
    Returns the task type string from the full pybpod task name:
218
    _iblrig_tasks_biasedChoiceWorld3.7.0 returns "biased"
219
    _iblrig_tasks_trainingChoiceWorld3.6.0 returns "training'
220
    :param task_name:
221
    :return: one of ['biased', 'habituation', 'training', 'ephys', 'mock_ephys', 'sync_ephys']
222
    """
223
    if isinstance(task_name, Path):
1✔
224
        task_name = get_task_protocol(task_name)
×
225
        if task_name is None:
×
226
            return
×
227
    task_types = _get_task_types_json_config()
1✔
228

229
    task_type = task_types.get(task_name, None)
1✔
230
    if task_type is None:  # Try lazy matching of name
1✔
231
        task_type = next((task_types[tt] for tt in task_types if tt in task_name), None)
1✔
232
    if task_type is None:
1✔
233
        _logger.warning(f'No extractor type found for {task_name}')
1✔
234
    return task_type
1✔
235

236

237
def get_session_extractor_type(session_path, task_collection='raw_behavior_data'):
1✔
238
    """
239
    From a session path, loads the settings file, finds the task and checks if extractors exist
240
    task names examples:
241
    :param session_path:
242
    :return: bool
243
    """
244
    settings = load_settings(session_path, task_collection=task_collection)
1✔
245
    if settings is None:
1✔
246
        _logger.error(f'ABORT: No data found in "{task_collection}" folder {session_path}')
×
247
        return False
×
248
    extractor_type = get_task_extractor_type(settings['PYBPOD_PROTOCOL'])
1✔
249
    if extractor_type:
1✔
250
        return extractor_type
1✔
251
    else:
252
        return False
×
253

254

255
def get_pipeline(session_path, task_collection='raw_behavior_data'):
1✔
256
    """
257
    Get the pre-processing pipeline name from a session path
258
    :param session_path:
259
    :return:
260
    """
261
    stype = get_session_extractor_type(session_path, task_collection=task_collection)
×
262
    return _get_pipeline_from_task_type(stype)
×
263

264

265
def _get_pipeline_from_task_type(stype):
1✔
266
    """
267
    Returns the pipeline from the task type. Some tasks types directly define the pipeline
268
    :param stype: session_type or task extractor type
269
    :return:
270
    """
271
    if stype in ['ephys_biased_opto', 'ephys', 'ephys_training', 'mock_ephys', 'sync_ephys']:
1✔
272
        return 'ephys'
1✔
273
    elif stype in ['habituation', 'training', 'biased', 'biased_opto']:
1✔
274
        return 'training'
1✔
275
    elif 'widefield' in stype:
1✔
276
        return 'widefield'
×
277
    else:
278
        return stype
1✔
279

280

281
def _get_task_extractor_map():
1✔
282
    """
283
    Load the task protocol extractor map.
284

285
    Returns
286
    -------
287
    dict(str, str)
288
        A map of task protocol to Bpod trials extractor class.
289
    """
290
    FILENAME = 'task_extractor_map.json'
1✔
291
    with open(Path(__file__).parent.joinpath(FILENAME)) as fp:
1✔
292
        task_extractors = json.load(fp)
1✔
293
    try:
1✔
294
        # look if there are custom extractor types in the personal projects repo
295
        import projects.base
1✔
296
        custom_extractors = Path(projects.base.__file__).parent.joinpath(FILENAME)
1✔
297
        with open(custom_extractors) as fp:
1✔
298
            custom_task_types = json.load(fp)
×
299
        task_extractors.update(custom_task_types)
×
300
    except (ModuleNotFoundError, FileNotFoundError):
1✔
301
        pass
1✔
302
    return task_extractors
1✔
303

304

305
def get_bpod_extractor_class(session_path, task_collection='raw_behavior_data'):
1✔
306
    """
307
    Get the Bpod trials extractor class associated with a given Bpod session.
308

309
    Parameters
310
    ----------
311
    session_path : str, pathlib.Path
312
        The session path containing Bpod behaviour data.
313
    task_collection : str
314
        The session_path subfolder containing the Bpod settings file.
315

316
    Returns
317
    -------
318
    str
319
        The extractor class name.
320
    """
321
    # Attempt to load settings files
322
    settings = load_settings(session_path, task_collection=task_collection)
1✔
323
    if settings is None:
1✔
324
        raise ValueError(f'No data found in "{task_collection}" folder {session_path}')
×
325
    # Attempt to get task protocol
326
    protocol = settings.get('PYBPOD_PROTOCOL')
1✔
327
    if not protocol:
1✔
328
        raise ValueError(f'No task protocol found in {session_path/task_collection}')
×
329
    return protocol2extractor(protocol)
1✔
330

331

332
def protocol2extractor(protocol):
1✔
333
    """
334
    Get the Bpod trials extractor class associated with a given Bpod task protocol.
335

336
    The Bpod task protocol can be found in the 'PYBPOD_PROTOCOL' field of _iblrig_taskSettings.raw.json.
337

338
    Parameters
339
    ----------
340
    protocol : str
341
        A Bpod task protocol name.
342

343
    Returns
344
    -------
345
    str
346
        The extractor class name.
347
    """
348
    # Attempt to get extractor class from protocol
349
    extractor_map = _get_task_extractor_map()
1✔
350
    extractor = extractor_map.get(protocol, None)
1✔
351
    if extractor is None:  # Try lazy matching of name
1✔
352
        extractor = next((extractor_map[tt] for tt in extractor_map if tt in protocol), None)
1✔
353
    if extractor is None:
1✔
354
        raise ValueError(f'No extractor associated with "{protocol}"')
×
355
    return extractor
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc