• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / ibllib / 7961675356254463

pending completion
7961675356254463

Pull #557

continuous-integration/UCL

olivier
add test
Pull Request #557: Chained protocols

718 of 718 new or added lines in 27 files covered. (100.0%)

12554 of 18072 relevant lines covered (69.47%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.46
/ibllib/oneibl/registration.py
1
from pathlib import Path
1✔
2
import json
1✔
3
import datetime
1✔
4
import logging
1✔
5
import itertools
1✔
6

7
from pkg_resources import parse_version
1✔
8
from one.alf.files import get_session_path, folder_parts, get_alf_path
1✔
9
from one.registration import RegistrationClient, get_dataset_type
1✔
10
from one.remote.globus import get_local_endpoint_id
1✔
11
import one.alf.exceptions as alferr
1✔
12
from one.util import datasets2records
1✔
13

14
import ibllib
1✔
15
import ibllib.io.extractors.base
1✔
16
from ibllib.time import isostr2date
1✔
17
import ibllib.io.raw_data_loaders as raw
1✔
18
from ibllib.io import session_params
1✔
19

20
_logger = logging.getLogger(__name__)
1✔
21
EXCLUDED_EXTENSIONS = ['.flag', '.error', '.avi']
1✔
22
REGISTRATION_GLOB_PATTERNS = ['alf/**/*.*',
1✔
23
                              'raw_behavior_data/**/_iblrig_*.*',
24
                              'raw_task_data_*/**/_iblrig_*.*',
25
                              'raw_passive_data/**/_iblrig_*.*',
26
                              'raw_behavior_data/**/_iblmic_*.*',
27
                              'raw_video_data/**/_iblrig_*.*',
28
                              'raw_video_data/**/_ibl_*.*',
29
                              'raw_ephys_data/**/_iblrig_*.*',
30
                              'raw_ephys_data/**/_spikeglx_*.*',
31
                              'raw_ephys_data/**/_iblqc_*.*',
32
                              'spikesorters/**/_kilosort_*.*'
33
                              'spikesorters/**/_kilosort_*.*',
34
                              'raw_widefield_data/**/_ibl_*.*',
35
                              'raw_photometry_data/**/_neurophotometrics_*.*',
36
                              ]
37

38

39
def register_dataset(file_list, one=None, exists=False, versions=None, **kwargs):
1✔
40
    """
41
    Registers a set of files belonging to a session only on the server.
42

43
    Parameters
44
    ----------
45
    file_list : list, str, pathlib.Path
46
        A filepath (or list thereof) of ALF datasets to register to Alyx.
47
    one : one.api.OneAlyx
48
        An instance of ONE.
49
    exists : bool
50
        Whether files exist in the repository. May be set to False when registering files
51
        before copying to the repository.
52
    versions : str, list of str
53
        Optional version tags, defaults to the current ibllib version.
54
    kwargs
55
        Optional keyword arguments for one.registration.RegistrationClient.register_files.
56

57
    Returns
58
    -------
59
    list of dicts, dict
60
        A list of newly created Alyx dataset records or the registration data if dry.
61

62
    Notes
63
    -----
64
    - If a repository is passed, server_only will be set to True.
65

66
    See Also
67
    --------
68
    one.registration.RegistrationClient.register_files
69
    """
70
    if not file_list:
1✔
71
        return
1✔
72
    elif isinstance(file_list, (str, Path)):
1✔
73
        file_list = [file_list]
1✔
74

75
    assert len(set(get_session_path(f) for f in file_list)) == 1
1✔
76
    assert all(Path(f).exists() for f in file_list)
1✔
77

78
    client = IBLRegistrationClient(one)
1✔
79
    # If the repository is specified then for the registration client we want server_only=True to
80
    # make sure we don't make any other repositories for the lab
81
    if kwargs.get('repository') and not kwargs.get('server_only', False):
1✔
82
        kwargs['server_only'] = True
×
83

84
    return client.register_files(file_list, versions=versions or ibllib.__version__, exists=exists, **kwargs)
1✔
85

86

87
def register_session_raw_data(session_path, one=None, overwrite=False, **kwargs):
1✔
88
    """
89
    Registers all files corresponding to raw data files to Alyx. It will select files that
90
    match Alyx registration patterns.
91

92
    Parameters
93
    ----------
94
    session_path : str, pathlib.Path
95
        The local session path.
96
    one : one.api.OneAlyx
97
        An instance of ONE.
98
    overwrite : bool
99
        If set to True, will patch the datasets. It will take very long. If set to False (default)
100
        will skip all already registered data.
101
    **kwargs
102
        Optional keyword arguments for one.registration.RegistrationClient.register_files.
103

104
    Returns
105
    -------
106
    list of pathlib.Path
107
        A list of raw dataset paths.
108
    list of dicts, dict
109
        A list of newly created Alyx dataset records or the registration data if dry.
110
    """
111
    client = IBLRegistrationClient(one)
1✔
112
    session_path = Path(session_path)
1✔
113
    eid = one.path2eid(session_path, query_type='remote')  # needs to make sure we're up to date
1✔
114
    if not eid:
1✔
115
        raise alferr.ALFError(f'Session does not exist on Alyx: {get_alf_path(session_path)}')
1✔
116
    # find all files that are in a raw data collection
117
    file_list = [f for f in client.find_files(session_path)
1✔
118
                 if f.relative_to(session_path).as_posix().startswith('raw')]
119
    # unless overwrite is True, filter out the datasets that already exist
120
    if not overwrite:
1✔
121
        # query the database for existing datasets on the session and allowed dataset types
122
        dsets = datasets2records(one.alyx.rest('datasets', 'list', session=eid))
1✔
123
        already_registered = list(map(session_path.joinpath, dsets['rel_path']))
1✔
124
        file_list = list(filter(lambda f: f not in already_registered, file_list))
1✔
125

126
    kwargs['repository'] = get_local_data_repository(one.alyx)
1✔
127
    kwargs['server_only'] = True
1✔
128

129
    response = client.register_files(file_list, versions=ibllib.__version__, exists=False, **kwargs)
1✔
130
    return file_list, response
1✔
131

132

133
class IBLRegistrationClient(RegistrationClient):
1✔
134
    """
1✔
135
    Object that keeps the ONE instance and provides method to create sessions and register data.
136
    """
137

138
    def register_session(self, ses_path, file_list=True, projects=None, procedures=None):
1✔
139
        """
140
        Register an IBL Bpod session in Alyx.
141

142
        Parameters
143
        ----------
144
        ses_path : str, pathlib.Path
145
            The local session path.
146
        file_list : bool, list
147
            An optional list of file paths to register.  If True, all valid files within the
148
            session folder are registered.  If False, no files are registered.
149
        projects: str, list
150
            The project(s) to which the experiment belongs (optional).
151
        procedures : str, list
152
            An optional list of procedures, e.g. 'Behavior training/tasks'.
153

154
        Returns
155
        -------
156
        dict
157
            An Alyx session record.
158

159
        Notes
160
        -----
161
        For a list of available projects:
162
        >>> sorted(proj['name'] for proj in one.alyx.rest('projects', 'list'))
163
        For a list of available procedures:
164
        >>> sorted(proc['name'] for proc in one.alyx.rest('procedures', 'list'))
165
        """
166
        if isinstance(ses_path, str):
1✔
167
            ses_path = Path(ses_path)
1✔
168

169
        # Read in the experiment description file if it exists and get projects and procedures from here
170
        experiment_description_file = session_params.read_params(ses_path)
1✔
171
        if experiment_description_file is None:
1✔
172
            collections = ['raw_behavior_data']
1✔
173
        else:
174
            projects = experiment_description_file.get('projects', projects)
1✔
175
            procedures = experiment_description_file.get('procedures', procedures)
1✔
176
            collections = session_params.get_task_collection(experiment_description_file)
1✔
177

178
        # read meta data from the rig for the session from the task settings file
179
        task_data = (raw.load_bpod(ses_path, collection) for collection in sorted(collections))
1✔
180
        # Filter collections where settings file was not found
181
        if not (task_data := list(zip(*filter(lambda x: x[0] is not None, task_data)))):
1✔
182
            raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.')
1✔
183
        settings, task_data = task_data
1✔
184
        if len(settings) != len(collections):
1✔
185
            raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.')
×
186

187
        # Do some validation
188
        _, subject, date, number, *_ = folder_parts(ses_path)
1✔
189
        assert len({x['SUBJECT_NAME'] for x in settings}) == 1 and settings[0]['SUBJECT_NAME'] == subject
1✔
190
        assert len({x['SESSION_DATE'] for x in settings}) == 1 and settings[0]['SESSION_DATE'] == date
1✔
191
        assert len({x['SESSION_NUMBER'] for x in settings}) == 1 and settings[0]['SESSION_NUMBER'] == number
1✔
192
        assert len({x['IS_MOCK'] for x in settings}) == 1
1✔
193
        assert len({md['PYBPOD_BOARD'] for md in settings}) == 1
1✔
194
        assert len({md.get('IBLRIG_VERSION') for md in settings}) == 1
1✔
195
        assert len({md['IBLRIG_VERSION_TAG'] for md in settings}) == 1
1✔
196

197
        # query Alyx endpoints for subject, error if not found
198
        subject = self.assert_exists(subject, 'subjects')
1✔
199

200
        # look for a session from the same subject, same number on the same day
201
        session_id, session = self.one.search(subject=subject['nickname'],
1✔
202
                                              date_range=date,
203
                                              number=number,
204
                                              details=True, query_type='remote')
205
        users = []
1✔
206
        for user in filter(None, map(lambda x: x.get('PYBPOD_CREATOR'), settings)):
1✔
207
            user = self.assert_exists(user[0], 'users')  # user is list of [username, uuid]
1✔
208
            users.append(user['username'])
1✔
209

210
        # extract information about session duration and performance
211
        start_time, end_time = _get_session_times(str(ses_path), settings, task_data)
1✔
212
        n_trials, n_correct_trials = _get_session_performance(settings, task_data)
1✔
213

214
        # TODO Add task_protocols to Alyx sessions endpoint
215
        task_protocols = [md['PYBPOD_PROTOCOL'] + md['IBLRIG_VERSION_TAG'] for md in settings]
1✔
216
        # unless specified label the session projects with subject projects
217
        projects = subject['projects'] if projects is None else projects
1✔
218
        # makes sure projects is a list
219
        projects = [projects] if isinstance(projects, str) else projects
1✔
220

221
        # unless specified label the session procedures with task protocol lookup
222
        procedures = procedures or list(set(filter(None, map(self._alyx_procedure_from_task, task_protocols))))
1✔
223
        procedures = [procedures] if isinstance(procedures, str) else procedures
1✔
224
        json_fields_names = ['IS_MOCK', 'IBLRIG_VERSION']
1✔
225
        json_field = {k: settings[0].get(k) for k in json_fields_names}
1✔
226
        # The poo count field is only updated if the field is defined in at least one of the settings
227
        poo_counts = [md.get('POOP_COUNT') for md in settings if md.get('POOP_COUNT') is not None]
1✔
228
        if poo_counts:
1✔
229
            json_field['POOP_COUNT'] = int(sum(poo_counts))
1✔
230

231
        if not session:  # Create session and weighings
1✔
232
            ses_ = {'subject': subject['nickname'],
1✔
233
                    'users': users or [subject['responsible_user']],
234
                    'location': settings[0]['PYBPOD_BOARD'],
235
                    'procedures': procedures,
236
                    'lab': subject['lab'],
237
                    'projects': projects,
238
                    'type': 'Experiment',
239
                    'task_protocol': '/'.join(task_protocols),
240
                    'number': number,
241
                    'start_time': self.ensure_ISO8601(start_time),
242
                    'end_time': self.ensure_ISO8601(end_time) if end_time else None,
243
                    'n_correct_trials': n_correct_trials,
244
                    'n_trials': n_trials,
245
                    'json': json_field
246
                    }
247
            session = self.one.alyx.rest('sessions', 'create', data=ses_)
1✔
248
            # Submit weights
249
            for md in filter(lambda md: md.get('SUBJECT_WEIGHT') is not None, settings):
1✔
250
                user = md.get('PYBPOD_CREATOR')
1✔
251
                user = user[0] if user[0] in users else self.one.alyx.user
1✔
252
                self.register_weight(subject['nickname'], md['SUBJECT_WEIGHT'],
1✔
253
                                     date_time=md['SESSION_DATETIME'], user=user)
254
        else:  # if session exists update the JSON field
255
            session = self.one.alyx.rest('sessions', 'read', id=session_id[0], no_cache=True)
×
256
            self.one.alyx.json_field_update('sessions', session['id'], data=json_field)
×
257

258
        _logger.info(session['url'] + ' ')
1✔
259
        # create associated water administration if not found
260
        if not session['wateradmin_session_related'] and any(task_data):
1✔
261
            for md, d in zip(settings, task_data):
1✔
262
                _, _end_time = _get_session_times(ses_path, md, d)
1✔
263
                user = md.get('PYBPOD_CREATOR')
1✔
264
                user = user[0] if user[0] in users else self.one.alyx.user
1✔
265
                volume = d[-1]['water_delivered'] / 1000
1✔
266
                self.register_water_administration(
1✔
267
                    subject['nickname'], volume, date_time=_end_time or end_time, user=user,
268
                    session=session['id'], water_type=md.get('REWARD_TYPE') or 'Water')
269
        # at this point the session has been created. If create only, exit
270
        if not file_list:
1✔
271
            return session, None
1✔
272

273
        # register all files that match the Alyx patterns and file_list
274
        rename_files_compatibility(ses_path, settings[0]['IBLRIG_VERSION_TAG'])
1✔
275
        F = filter(lambda x: self._register_bool(x.name, file_list), self.find_files(ses_path))
1✔
276
        recs = self.register_files(F, created_by=users[0] if users else None, versions=ibllib.__version__)
1✔
277
        return session, recs
1✔
278

279
    @staticmethod
1✔
280
    def _register_bool(fn, file_list):
1✔
281
        if isinstance(file_list, bool):
1✔
282
            return file_list
1✔
283
        if isinstance(file_list, str):
×
284
            file_list = [file_list]
×
285
        return any(str(fil) in fn for fil in file_list)
×
286

287
    @staticmethod
1✔
288
    def _alyx_procedure_from_task(task_protocol):
1✔
289
        task_type = ibllib.io.extractors.base.get_task_extractor_type(task_protocol)
1✔
290
        procedure = _alyx_procedure_from_task_type(task_type)
1✔
291
        return procedure or []
1✔
292

293
    def find_files(self, session_path):
1✔
294
        """Similar to base class method but further filters by name and extension.
295

296
        In addition to finding files that match Excludes files
297
        whose extension is in EXCLUDED_EXTENSIONS, or that don't match the patterns in
298
        REGISTRATION_GLOB_PATTERNS.
299

300
        Parameters
301
        ----------
302
        session_path : str, pathlib.Path
303
            The session path to search.
304

305
        Yields
306
        -------
307
        pathlib.Path
308
            File paths that match the dataset type patterns in Alyx and registration glob patterns.
309
        """
310
        files = itertools.chain.from_iterable(session_path.glob(x) for x in REGISTRATION_GLOB_PATTERNS)
1✔
311
        for file in filter(lambda x: x.suffix not in EXCLUDED_EXTENSIONS, files):
1✔
312
            try:
1✔
313
                get_dataset_type(file, self.dtypes)
1✔
314
                yield file
1✔
315
            except ValueError as ex:
1✔
316
                _logger.error(ex)
1✔
317

318

319
def _alyx_procedure_from_task_type(task_type):
1✔
320
    lookup = {'biased': 'Behavior training/tasks',
1✔
321
              'biased_opto': 'Behavior training/tasks',
322
              'habituation': 'Behavior training/tasks',
323
              'training': 'Behavior training/tasks',
324
              'ephys': 'Ephys recording with acute probe(s)',
325
              'ephys_biased_opto': 'Ephys recording with acute probe(s)',
326
              'ephys_passive_opto': 'Ephys recording with acute probe(s)',
327
              'ephys_replay': 'Ephys recording with acute probe(s)',
328
              'ephys_training': 'Ephys recording with acute probe(s)',
329
              'mock_ephys': 'Ephys recording with acute probe(s)',
330
              'sync_ephys': 'Ephys recording with acute probe(s)'}
331
    try:
1✔
332
        # look if there are tasks in the personal projects repo with procedures
333
        import projects.base
1✔
334
        custom_tasks = Path(projects.base.__file__).parent.joinpath('task_type_procedures.json')
1✔
335
        with open(custom_tasks) as fp:
1✔
336
            lookup.update(json.load(fp))
×
337
    except (ModuleNotFoundError, FileNotFoundError):
1✔
338
        pass
1✔
339
    if task_type in lookup:
1✔
340
        return lookup[task_type]
1✔
341

342

343
def rename_files_compatibility(ses_path, version_tag):
1✔
344
    if not version_tag:
1✔
345
        return
×
346
    if parse_version(version_tag) <= parse_version('3.2.3'):
1✔
347
        task_code = ses_path.glob('**/_ibl_trials.iti_duration.npy')
×
348
        for fn in task_code:
×
349
            fn.replace(fn.parent.joinpath('_ibl_trials.itiDuration.npy'))
×
350
    task_code = ses_path.glob('**/_iblrig_taskCodeFiles.raw.zip')
1✔
351
    for fn in task_code:
1✔
352
        fn.replace(fn.parent.joinpath('_iblrig_codeFiles.raw.zip'))
×
353

354

355
def _get_session_times(fn, md, ses_data):
1✔
356
    """
357
    Get session start and end time from the Bpod data.
358

359
    Parameters
360
    ----------
361
    fn : str, pathlib.Path
362
        Session/task identifier. Only used in warning logs.
363
    md : dict, list of dict
364
        A session parameters dictionary or list thereof.
365
    ses_data : dict, list of dict
366
        A session data dictionary or list thereof.
367

368
    Returns
369
    -------
370
    datetime.datetime
371
        The datetime of the start of the session.
372
    datetime.datetime
373
        The datetime of the end of the session, or None is ses_data is None.
374
    """
375
    if isinstance(md, dict):
1✔
376
        start_time = _start_time = isostr2date(md['SESSION_DATETIME'])
1✔
377
    else:
378
        start_time = isostr2date(md[0]['SESSION_DATETIME'])
1✔
379
        _start_time = isostr2date(md[-1]['SESSION_DATETIME'])
1✔
380
        assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md)
1✔
381
        assert len(md) == 1 or start_time < _start_time
1✔
382
        ses_data = ses_data[-1]
1✔
383
    if not ses_data:
1✔
384
        return start_time, None
1✔
385
    c = ses_duration_secs = 0
1✔
386
    for sd in reversed(ses_data):
1✔
387
        ses_duration_secs = (sd['behavior_data']['Trial end timestamp'] -
1✔
388
                             sd['behavior_data']['Bpod start timestamp'])
389
        if ses_duration_secs < (6 * 3600):
1✔
390
            break
1✔
391
        c += 1
1✔
392
    if c:
1✔
393
        _logger.warning(('Trial end timestamps of last %i trials above 6 hours '
1✔
394
                         '(most likely corrupt): %s'), c, str(fn))
395
    end_time = _start_time + datetime.timedelta(seconds=ses_duration_secs)
1✔
396
    return start_time, end_time
1✔
397

398

399
def _get_session_performance(md, ses_data):
1✔
400
    """
401
    Get performance about the session from Bpod data.
402
    Note: This does not support custom protocols.
403

404
    Parameters
405
    ----------
406
    md : dict, list of dict
407
        A session parameters dictionary or list thereof.
408
    ses_data : dict, list of dict
409
        A session data dictionary or list thereof.
410

411
    Returns
412
    -------
413
    int
414
        The total number of trials across protocols.
415
    int
416
        The total number of correct trials across protocols.
417
    """
418
    if not any(filter(None, ses_data or None)):
1✔
419
        return None, None
1✔
420

421
    if isinstance(md, dict):
1✔
422
        ses_data = [ses_data]
1✔
423
        md = [md]
1✔
424
    else:
425
        assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md)
1✔
426

427
    n_trials = [x[-1]['trial_num'] for x in ses_data]
1✔
428
    # checks that the number of actual trials and labeled number of trials check out
429
    assert all(len(x) == n for x, n in zip(ses_data, n_trials))
1✔
430
    # task specific logic
431
    n_correct_trials = []
1✔
432
    for data, proc in zip(ses_data, map(lambda x: x.get('PYBPOD_PROTOCOL', ''), md)):
1✔
433
        if 'habituationChoiceWorld' in proc:
1✔
434
            n_correct_trials.append(0)
1✔
435
        else:
436
            n_correct_trials.append(data[-1]['ntrials_correct'])
1✔
437

438
    return sum(n_trials), sum(n_correct_trials)
1✔
439

440

441
def get_local_data_repository(ac):
1✔
442
    """
443
    Get local data repo name from Globus client.
444

445
    Parameters
446
    ----------
447
    ac : one.webclient.AlyxClient
448
        An AlyxClient instance for querying data repositories.
449

450
    Returns
451
    -------
452
    str
453
        The (first) data repository associated with the local Globus endpoint ID.
454
    """
455
    try:
1✔
456
        assert ac
1✔
457
        globus_id = get_local_endpoint_id()
1✔
458
    except AssertionError:
×
459
        return
×
460

461
    data_repo = ac.rest('data-repository', 'list', globus_endpoint_id=globus_id)
1✔
462
    return next((da['name'] for da in data_repo), None)
1✔
463

464

465
def get_lab(ac):
1✔
466
    """
467
    Get list of associated labs from Globus client ID.
468

469
    Parameters
470
    ----------
471
    ac : one.webclient.AlyxClient
472
        An AlyxClient instance for querying data repositories.
473

474
    Returns
475
    -------
476
    list
477
        The lab names associated with the local Globus endpoint ID.
478
    """
479
    globus_id = get_local_endpoint_id()
1✔
480
    lab = ac.rest('labs', 'list', django=f'repositories__globus_endpoint_id,{globus_id}')
1✔
481
    return [la['name'] for la in lab]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc