• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 9031936551

10 May 2024 12:05PM UTC coverage: 48.538% (+1.7%) from 46.79%
9031936551

Pull #643

github

53c3e3
web-flow
Merge 3c8214f78 into ec2d8e4fe
Pull Request #643: 8.19.0

377 of 1073 new or added lines in 38 files covered. (35.14%)

977 existing lines in 19 files now uncovered.

3253 of 6702 relevant lines covered (48.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/iblrig/path_helper.py
1
import logging
2✔
2
import os
2✔
3
import re
2✔
4
import shutil
2✔
5
import subprocess
2✔
6
from pathlib import Path
2✔
7
from typing import TypeVar
2✔
8

9
import numpy as np
2✔
10
import yaml
2✔
11
from packaging import version
2✔
12
from pydantic import BaseModel, ValidationError
2✔
13

14
import iblrig
2✔
15
from ibllib.io import session_params
2✔
16
from ibllib.io.raw_data_loaders import load_settings
2✔
17
from iblrig.constants import HARDWARE_SETTINGS_YAML, RIG_SETTINGS_YAML
2✔
18
from iblrig.pydantic_definitions import HardwareSettings, RigSettings
2✔
19
from iblutil.util import Bunch
2✔
20
from one.alf.spec import is_session_path
2✔
21

22
log = logging.getLogger(__name__)
2✔
23
T = TypeVar('T', bound=BaseModel)
2✔
24

25

26
def iterate_previous_sessions(subject_name, task_name, n=1, **kwargs):
2✔
27
    """
28
    This function iterates over the sessions of a given subject in both the remote and local path
29
    and searches for a given protocol name. It returns the information of the last n found
30
    matching protocols in the form of a dictionary
31
    :param subject_name:
32
    :param task_name: name of the protocol to look for in experiment description : '_iblrig_tasks_trainingChoiceWorld'
33
    :param n: number of maximum protocols to return
34
    :param kwargs: optional arguments to be passed to iblrig.path_helper.get_local_and_remote_paths
35
    if not used, will use the arguments from iblrig/settings/iblrig_settings.yaml
36
    :return:
37
        list of dictionaries with keys: session_path, experiment_description, task_settings, file_task_data
38
    """
39
    rig_paths = get_local_and_remote_paths(**kwargs)
2✔
40
    sessions = _iterate_protocols(rig_paths.local_subjects_folder.joinpath(subject_name), task_name=task_name, n=n)
2✔
41
    if rig_paths.remote_subjects_folder is not None:
2✔
42
        remote_sessions = _iterate_protocols(rig_paths.remote_subjects_folder.joinpath(subject_name), task_name=task_name, n=n)
2✔
43
        if remote_sessions is not None:
2✔
44
            sessions.extend(remote_sessions)
2✔
45
        _, ises = np.unique([s['session_stub'] for s in sessions], return_index=True)
2✔
46
        sessions = [sessions[i] for i in ises]
2✔
47
    return sessions
2✔
48

49

50
def _iterate_protocols(subject_folder, task_name, n=1, min_trials=43):
2✔
51
    """
52
    Return information on the last n sessions with matching protocol.
53

54
    This function iterates over the sessions of a given subject and searches for a given protocol name.
55

56
    Parameters
57
    ----------
58
    subject_folder : pathlib.Path
59
        A subject folder containing dated folders.
60
    task_name : str
61
        The task protocol name to look for.
62
    n : int
63
        The number of previous protocols to return.
64
    min_trials : int
65
        Skips sessions with fewer than this number of trials.
66

67
    Returns
68
    -------
69
    list of dict
70
        list of dictionaries with keys: session_stub, session_path, experiment_description,
71
        task_settings, file_task_data.
72
    """
73

74
    def proc_num(x):
2✔
75
        """Return protocol number.
76

77
        Use 'protocol_number' key if present (unlikely), otherwise use collection name.
78
        """
79
        i = (x or {}).get('collection', '00').split('_')
2✔
80
        collection_int = int(i[-1]) if i[-1].isnumeric() else 0
2✔
81
        return x.get('protocol_number', collection_int)
2✔
82

83
    protocols = []
2✔
84
    if subject_folder is None or Path(subject_folder).exists() is False:
2✔
85
        return protocols
2✔
86
    sessions = subject_folder.glob('????-??-??/*/_ibl_experiment.description*.yaml')  # seq may be X or XXX
2✔
87
    # Make extra sure to only include valid sessions
88
    sessions = filter(lambda x: is_session_path(x.relative_to(subject_folder.parent).parent), sessions)
2✔
89
    for file_experiment in sorted(sessions, reverse=True):
2✔
90
        session_path = file_experiment.parent
2✔
91
        ad = session_params.read_params(file_experiment)
2✔
92
        # reversed: we look for the last task first if the protocol ran twice
93
        tasks = filter(None, map(lambda x: x.get(task_name), ad.get('tasks', [])))
2✔
94
        for adt in sorted(tasks, key=proc_num, reverse=True):
2✔
95
            if not (task_settings := load_settings(session_path, task_collection=adt['collection'])):
2✔
96
                continue
2✔
97
            if task_settings.get('NTRIALS', min_trials + 1) < min_trials:  # ignore sessions with too few trials
2✔
98
                continue
2✔
99
            protocols.append(
2✔
100
                Bunch(
101
                    {
102
                        'session_stub': '_'.join(file_experiment.parent.parts[-2:]),  # 2019-01-01_001
103
                        'session_path': file_experiment.parent,
104
                        'task_collection': adt['collection'],
105
                        'experiment_description': ad,
106
                        'task_settings': task_settings,
107
                        'file_task_data': session_path.joinpath(adt['collection'], '_iblrig_taskData.raw.jsonable'),
108
                    }
109
                )
110
            )
111
            if len(protocols) >= n:
2✔
112
                return protocols
2✔
113
    return protocols
2✔
114

115

116
def get_local_and_remote_paths(local_path=None, remote_path=None, lab=None, iblrig_settings=None):
2✔
117
    """
118
    Function used to parse input arguments to transfer commands.
119

120
    If the arguments are None, reads in the settings and returns the values from the files.
121
    local_subjects_path always has a fallback on the home directory / iblrig_data
122
    remote_subjects_path has no fallback and will return None when all options are exhausted
123
    :param local_path:
124
    :param remote_path:
125
    :param lab:
126
    :param iblrig_settings: if provided, settings dictionary, otherwise will load the default settings files
127
    :return: dictionary, with following keys (example output)
128
       {'local_data_folder': PosixPath('C:/iblrigv8_data'),
129
        'remote_data_folder': PosixPath('Y:/'),
130
        'local_subjects_folder': PosixPath('C:/iblrigv8_data/mainenlab/Subjects'),
131
        'remote_subjects_folder': PosixPath('Y:/Subjects')}
132
    """
133
    paths = Bunch({'local_data_folder': local_path, 'remote_data_folder': remote_path})
2✔
134
    # we only want to attempt to load the settings file if necessary
135
    if (local_path is None) or (remote_path is None) or (lab is None):
2✔
136
        iblrig_settings = load_pydantic_yaml(RigSettings) if iblrig_settings is None else iblrig_settings
2✔
137
    if paths.local_data_folder is None:
2✔
138
        paths.local_data_folder = (
2✔
139
            Path(p) if (p := iblrig_settings['iblrig_local_data_path']) else Path.home().joinpath('iblrig_data')
140
        )
141
    elif isinstance(paths.local_data_folder, str):
2✔
142
        paths.local_data_folder = Path(paths.local_data_folder)
2✔
143
    if paths.remote_data_folder is None:
2✔
144
        paths.remote_data_folder = Path(p) if (p := iblrig_settings['iblrig_remote_data_path']) else None
2✔
145
    elif isinstance(paths.remote_data_folder, str):
2✔
146
        paths.remote_data_folder = Path(paths.remote_data_folder)
2✔
147

148
    # Get the subjects folders. If not defined in the settings, assume data path + /Subjects
149
    paths.local_subjects_folder = (iblrig_settings or {}).get('iblrig_local_subjects_path', None)
2✔
150
    lab = lab or (iblrig_settings or {}).get('ALYX_LAB', None)
2✔
151
    if paths.local_subjects_folder is None:
2✔
152
        if paths.local_data_folder.name == 'Subjects':
2✔
153
            paths.local_subjects_folder = paths.local_data_folder
2✔
154
        elif lab:  # append lab/Subjects part
2✔
155
            paths.local_subjects_folder = paths.local_data_folder.joinpath(lab, 'Subjects')
2✔
156
        else:  # NB: case is important here. ALF spec expects lab folder before 'Subjects' (capitalized)
157
            paths.local_subjects_folder = paths.local_data_folder.joinpath('subjects')
2✔
158
    else:
159
        paths.local_subjects_folder = Path(paths.local_subjects_folder)
2✔
160
    # Same for remote subjects folder
161
    paths.remote_subjects_folder = (iblrig_settings or {}).get('iblrig_remote_subjects_path', None)
2✔
162
    if paths.remote_subjects_folder is None:
2✔
163
        if paths.remote_data_folder:
2✔
164
            if paths.remote_data_folder.name == 'Subjects':
2✔
165
                paths.remote_subjects_folder = paths.remote_data_folder
2✔
166
            else:
167
                paths.remote_subjects_folder = paths.remote_data_folder.joinpath('Subjects')
2✔
168
    else:
169
        paths.remote_subjects_folder = Path(paths.remote_subjects_folder)
2✔
170
    return paths
2✔
171

172

173
def _load_settings_yaml(filename: Path | str = RIG_SETTINGS_YAML, do_raise: bool = True) -> Bunch:
2✔
174
    filename = Path(filename)
2✔
175
    if not filename.is_absolute():
2✔
176
        filename = Path(iblrig.__file__).parents[1].joinpath('settings', filename)
2✔
177
    if not filename.exists() and not do_raise:
2✔
178
        log.error(f'File not found: {filename}')
×
179
        return Bunch()
×
180
    with open(filename) as fp:
2✔
181
        rs = yaml.safe_load(fp)
2✔
182
    rs = patch_settings(rs, filename.stem)
2✔
183
    return Bunch(rs)
2✔
184

185

186
def load_pydantic_yaml(model: type[T], filename: Path | str | None = None, do_raise: bool = True) -> T:
2✔
187
    """
188
    Load YAML data from a specified file or a standard IBLRIG settings file,
189
    validate it using a Pydantic model, and return the validated Pydantic model
190
    instance.
191

192
    Parameters
193
    ----------
194
    model : Type[T]
195
        The Pydantic model class to validate the YAML data against.
196
    filename : Path | str | None, optional
197
        The path to the YAML file.
198
        If None (default), the function deduces the appropriate standard IBLRIG
199
        settings file based on the model.
200
    do_raise : bool, optional
201
        If True (default), raise a ValidationError if validation fails.
202
        If False, log the validation error and construct a model instance
203
        with the provided data. Defaults to True.
204

205
    Returns
206
    -------
207
    T
208
        An instance of the Pydantic model, validated against the YAML data.
209

210
    Raises
211
    ------
212
    ValidationError
213
        If validation fails and do_raise is set to True.
214
        The raised exception contains details about the validation error.
215
    TypeError
216
        If the filename is None and the model class is not recognized as
217
        HardwareSettings or RigSettings.
218
    """
219
    if filename is None:
2✔
220
        if model == HardwareSettings:
2✔
UNCOV
221
            filename = HARDWARE_SETTINGS_YAML
×
222
        elif model == RigSettings:
2✔
223
            filename = RIG_SETTINGS_YAML
2✔
224
        else:
UNCOV
225
            raise TypeError(f'Cannot deduce filename for model `{model.__name__}`.')
×
226
    if filename not in (HARDWARE_SETTINGS_YAML, RIG_SETTINGS_YAML):
2✔
227
        # TODO: We currently skip validation of pydantic models if an extra
228
        #       filename is provided that does NOT correspond to the standard
229
        #       settings files of IBLRIG. This should be re-evaluated.
230
        do_raise = False
2✔
231
    rs = _load_settings_yaml(filename=filename, do_raise=do_raise)
2✔
232
    try:
2✔
233
        return model.model_validate(rs)
2✔
UNCOV
234
    except ValidationError as e:
×
UNCOV
235
        if not do_raise:
×
UNCOV
236
            log.exception(e)
×
UNCOV
237
            return model.model_construct(**rs)
×
238
        else:
UNCOV
239
            raise e
×
240

241

242
def save_pydantic_yaml(data: T, filename: Path | str | None = None) -> None:
2✔
243
    if filename is None:
2✔
UNCOV
244
        if isinstance(data, HardwareSettings):
×
UNCOV
245
            filename = HARDWARE_SETTINGS_YAML
×
UNCOV
246
        elif isinstance(data, RigSettings):
×
UNCOV
247
            filename = RIG_SETTINGS_YAML
×
248
        else:
UNCOV
249
            raise TypeError(f'Cannot deduce filename for model `{type(data).__name__}`.')
×
250
    else:
251
        filename = Path(filename)
2✔
252
    yaml_data = data.model_dump()
2✔
253
    data.model_validate(yaml_data)
2✔
254
    with open(filename, 'w') as f:
2✔
255
        log.debug(f'Dumping {type(data).__name__} to {filename.name}')
1✔
256
        yaml.dump(yaml_data, f, sort_keys=False)
1✔
257

258

259
def patch_settings(rs: dict, filename: str | Path) -> dict:
2✔
260
    """
261
    Update loaded settings files to ensure compatibility with latest version.
262

263
    Parameters
264
    ----------
265
    rs : dict
266
        A loaded settings file.
267
    filename : str | Path
268
        The filename of the settings file.
269

270
    Returns
271
    -------
272
    dict
273
        The updated settings.
274
    """
275
    filename = Path(filename)
2✔
276
    settings_version = version.parse(rs.get('VERSION', '0.0.0'))
2✔
277
    if filename.stem.startswith('hardware'):
2✔
278
        if settings_version < version.Version('1.0.0') and 'device_camera' in rs:
2✔
279
            log.info('Patching hardware settings; assuming left camera label')
2✔
280
            rs['device_cameras'] = {'left': rs.pop('device_camera')}
2✔
281
            rs['VERSION'] = '1.0.0'
2✔
282
        if 'device_cameras' in rs and rs['device_cameras'] is not None:
2✔
283
            rs['device_cameras'] = {k: v for k, v in rs['device_cameras'].items() if v}  # remove empty keys
2✔
284
            idx_missing = set(rs['device_cameras']) == {'left'} and 'INDEX' not in rs['device_cameras']['left']
2✔
285
            if settings_version < version.Version('1.1.0') and idx_missing:
2✔
286
                log.info('Patching hardware settings; assuming left camera index and training workflow')
2✔
287
                workflow = rs['device_cameras']['left'].pop('BONSAI_WORKFLOW', None)
2✔
288
                bonsai_workflows = {'setup': 'devices/camera_setup/setup_video.bonsai', 'recording': workflow}
2✔
289
                rs['device_cameras'] = {
2✔
290
                    'training': {'BONSAI_WORKFLOW': bonsai_workflows, 'left': {'INDEX': 1, 'SYNC_LABEL': 'audio'}}
291
                }
292
                rs['VERSION'] = '1.1.0'
2✔
293
        if rs.get('device_cameras') is None:
2✔
294
            rs['device_cameras'] = {}
2✔
295
    return rs
2✔
296

297

298
def get_commit_hash(folder: str):
2✔
299
    here = os.getcwd()
2✔
300
    os.chdir(folder)
2✔
301
    out = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip()
2✔
302
    os.chdir(here)
2✔
303
    if not out:
2✔
UNCOV
304
        log.debug('Commit hash is empty string')
×
305
    log.debug(f'Found commit hash {out}')
2✔
306
    return out
2✔
307

308

309
def iterate_collection(session_path: str, collection_name='raw_task_data') -> str:
2✔
310
    """
311
    Given a session path returns the next numbered collection name.
312

313
    Parameters
314
    ----------
315
    session_path : str
316
        The session path containing zero or more numbered collections.
317
    collection_name : str
318
        The collection name without the _NN suffix.
319

320
    Returns
321
    -------
322
    str
323
        The next numbered collection name.
324

325
    Examples
326
    --------
327
    In a folder where there are no raw task data folders
328

329
    >>> iterate_collection('./subject/2020-01-01/001')
330
    'raw_task_data_00'
331

332
    In a folder where there is one raw_imaging_data_00 folder
333

334
    >>> iterate_collection('./subject/2020-01-01/001', collection_name='raw_imaging_data')
335
    'raw_imaging_data_01'
336
    """
337
    if not Path(session_path).exists():
2✔
338
        return f'{collection_name}_00'
2✔
339
    collections = filter(Path.is_dir, Path(session_path).iterdir())
2✔
340
    collection_names = map(lambda x: x.name, collections)
2✔
341
    tasks = sorted(filter(re.compile(f'{collection_name}' + '_[0-9]{2}').match, collection_names))
2✔
342
    if len(tasks) == 0:
2✔
343
        return f'{collection_name}_00'
2✔
344
    return f'{collection_name}_{int(tasks[-1][-2:]) + 1:02}'
2✔
345

346

347
def create_bonsai_layout_from_template(workflow_file: Path) -> None:
2✔
348
    if not workflow_file.exists():
2✔
UNCOV
349
        FileNotFoundError(workflow_file)
×
350
    if not (layout_file := workflow_file.with_suffix('.bonsai.layout')).exists():
2✔
351
        template_file = workflow_file.with_suffix('.bonsai.layout_template')
2✔
352
        if template_file.exists():
2✔
353
            log.info(f'Creating default {layout_file.name}')
2✔
354
            shutil.copy(template_file, layout_file)
2✔
355
        else:
UNCOV
356
            log.debug(f'No template layout for {workflow_file.name}')
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc