• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 14196118657

01 Apr 2025 12:52PM UTC coverage: 47.634% (+0.8%) from 46.79%
14196118657

Pull #795

github

cfb5bd
web-flow
Merge 5ba5d5f25 into 58cf64236
Pull Request #795: fixes for habituation CW

11 of 12 new or added lines in 1 file covered. (91.67%)

1083 existing lines in 22 files now uncovered.

4288 of 9002 relevant lines covered (47.63%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.76
/iblrig/transfer_experiments.py
1
import datetime
2✔
2
import json
2✔
3
import logging
2✔
4
import os
2✔
5
import shutil
2✔
6
import socket
2✔
7
import traceback
2✔
8
import uuid
2✔
9
from collections.abc import Iterable
2✔
10
from enum import IntEnum
2✔
11
from os.path import samestat
2✔
12
from pathlib import Path
2✔
13

14
import numpy as np
2✔
15
import pandas as pd
2✔
16
import pandera
2✔
17

18
import ibllib.pipes.misc
2✔
19
import iblrig
2✔
20
import one.alf.path as alfiles
2✔
21
from ibllib.io import raw_data_loaders, session_params
2✔
22
from ibllib.pipes.misc import sleepless
2✔
23
from iblrig.raw_data_loaders import load_task_jsonable
2✔
24
from iblutil.io import hashfile
2✔
25
from iblutil.util import ensure_list
2✔
26

27
log = logging.getLogger(__name__)
2✔
28

29
ES_CONTINUOUS = 0x80000000
2✔
30
ES_SYSTEM_REQUIRED = 0x00000001
2✔
31

32

33
class CopyState(IntEnum):
2✔
34
    HARD_RESET = -1
2✔
35
    NOT_REGISTERED = 0
2✔
36
    PENDING = 1
2✔
37
    COMPLETE = 2
2✔
38
    FINALIZED = 3
2✔
39

40

41
@sleepless
2✔
42
def _copy2_checksum(src: str, dst: str, *args, **kwargs) -> str:
2✔
43
    """
44
    Copy a file from source to destination with checksum verification.
45

46
    This function copies a file from the source path to the destination path
47
    while verifying the BLAKE2B hash of the source and destination files. If the
48
    BLAKE2B hashes do not match after copying, an OSError is raised.
49

50
    Parameters
51
    ----------
52
    src : str
53
        The path to the source file.
54
    dst : str
55
        The path to the destination file.
56
    *args, **kwargs
57
        Additional arguments and keyword arguments to pass to `shutil.copy2`.
58

59
    Returns
60
    -------
61
    str
62
        The path to the copied file.
63

64
    Raises
65
    ------
66
    OSError
67
        If the BLAKE2B hashes of the source and destination files do not match.
68
    """
69
    log.info(f'Processing `{src}`:')
2✔
70
    log.info('  - calculating hash of local file')
2✔
71
    src_md5 = hashfile.blake2b(src, False)
2✔
72
    if os.path.exists(dst) and samestat(os.stat(src), os.stat(dst)):
2✔
UNCOV
73
        log.info('  - file already exists at destination')
×
UNCOV
74
        log.info('  - calculating hash of remote file')
×
UNCOV
75
        if src_md5 == hashfile.blake2b(dst, False):
×
UNCOV
76
            log.info('  - local and remote BLAKE2B hashes match, skipping copy')
×
UNCOV
77
            return dst
×
78
        else:
UNCOV
79
            log.info('  - local and remote hashes DO NOT match')
×
80
    log.info(f'  - copying file to `{dst}`')
2✔
81
    return_val = shutil.copy2(src, dst, *args, **kwargs)
2✔
82
    log.info('  - calculating hash of remote file')
2✔
83
    if not src_md5 == hashfile.blake2b(dst, False):
2✔
UNCOV
84
        raise OSError(f'Error copying {src}: hash mismatch.')
×
85
    log.info('  - local and remote hashes match, copy successful')
2✔
86
    return return_val
2✔
87

88

89
def copy_folders(local_folder: Path, remote_folder: Path, overwrite: bool = False) -> bool:
2✔
90
    """
91
    Copy folders and files from a local location to a remote location.
92

93
    This function copies all folders and files from a local directory to a
94
    remote directory. It provides options to overwrite existing files in
95
    the remote directory and ignore specific file patterns.
96

97
    Parameters
98
    ----------
99
    local_folder : Path
100
        The path to the local folder to copy from.
101
    remote_folder : Path
102
        The path to the remote folder to copy to.
103
    overwrite : bool, optional
104
        If True, overwrite existing files in the remote folder. Default is False.
105

106
    Returns
107
    -------
108
    bool
109
        True if the copying is successful, False otherwise.
110
    """
111
    status = True
2✔
112
    try:
2✔
113
        remote_folder.parent.mkdir(parents=True, exist_ok=True)
2✔
114
        shutil.copytree(
2✔
115
            local_folder,
116
            remote_folder,
117
            dirs_exist_ok=overwrite,
118
            ignore=shutil.ignore_patterns('transfer_me.flag'),
119
            copy_function=_copy2_checksum,
120
        )
UNCOV
121
    except OSError:
×
UNCOV
122
        log.error(traceback.format_exc())
×
UNCOV
123
        log.info(f'Could not copy {local_folder} to {remote_folder}')
×
UNCOV
124
        status = False
×
125
    return status
2✔
126

127

128
class SessionCopier:
2✔
129
    """Initialize and copy session data to a remote server."""
130

131
    assert_connect_on_init = True
2✔
132
    """bool: Raise error if unable to write stub file to remote server."""
2✔
133

134
    _experiment_description = None
2✔
135
    """dict: The experiment description file used for the copy."""
2✔
136

137
    tag = f'{socket.gethostname()}_{uuid.getnode()}'
2✔
138
    """str: The device name (adds this to the experiment description stub file on the remote server)."""
2✔
139

140
    def __init__(self, session_path, remote_subjects_folder=None, tag=None):
2✔
141
        """
142
        Initialize and copy session data to a remote server.
143

144
        Parameters
145
        ----------
146
        session_path : str, pathlib.Path
147
            The partial or session path to copy.
148
        remote_subjects_folder : str, pathlib.Path
149
            The remote server path to which to copy the session data.
150
        tag : str
151
            The device name (adds this to the experiment description stub file on the remote server).
152
        """
153
        self.tag = tag or self.tag
2✔
154
        self.session_path = Path(session_path)
2✔
155
        self.remote_subjects_folder = Path(remote_subjects_folder) if remote_subjects_folder else None
2✔
156

157
    def __repr__(self):
2✔
UNCOV
158
        return f'{super().__repr__()} \n local: {self.session_path} \n remote: {self.remote_session_path}'
×
159

160
    @property
2✔
161
    def state(self):
2✔
162
        return self.get_state()[0]
2✔
163

164
    def run(self, number_of_expected_devices=None):
2✔
165
        """
166
        Run the copy of this device experiment.
167

168
        Will try to get as far as possible in the copy process (from states 0 init experiment to state 3 finalize experiment)
169
        if possible, and return earlier if the process can't be completed.
170
        """
171
        if self.state == CopyState.HARD_RESET:  # this case is not implemented automatically and corresponds to a hard reset
2✔
UNCOV
172
            log.info(f'{self.state}, {self.session_path}')
×
UNCOV
173
            shutil.rmtree(self.remote_session_path)
×
UNCOV
174
            self.initialize_experiment()
×
175
        if self.state == CopyState.NOT_REGISTERED:  # the session hasn't even been initialized: copy the stub to the remote
2✔
UNCOV
176
            log.info(f'{self.state}, {self.session_path}')
×
UNCOV
177
            self.initialize_experiment()
×
178
        if self.state == CopyState.PENDING:  # the session is ready for copy
2✔
179
            log.info(f'{self.state}, {self.session_path}')
2✔
180
            self.copy_collections()
2✔
181
        if self.state == CopyState.COMPLETE:
2✔
182
            log.info(f'{self.state}, {self.session_path}')
2✔
183
            self.finalize_copy(number_of_expected_devices=number_of_expected_devices)
2✔
184
        if self.state == CopyState.FINALIZED:
2✔
185
            log.info(f'{self.state}, {self.session_path}')
2✔
186

187
    def get_state(self) -> tuple[CopyState | None, str]:
2✔
188
        """
189
        Gets the current copier state.
190

191
        State 0: this device experiment has not been initialized for this device
192
        State 1: this device experiment is initialized (the experiment description stub is present on the remote)
193
        State 2: this device experiment is copied on the remote server, but other devices copies are still pending
194
        State 3: the whole experiment is finalized and all data is on the server
195
        :return:
196
        """
197
        if self.remote_subjects_folder is None or not self.remote_subjects_folder.exists():
2✔
UNCOV
198
            return None, f'Remote subjects folder {self.remote_subjects_folder} set to Null or unreachable'
×
199
        if not self.file_remote_experiment_description.exists():
2✔
200
            return (
2✔
201
                CopyState.NOT_REGISTERED,
202
                f'Copy object not registered on server: {self.file_remote_experiment_description} does not exist',
203
            )
204
        status_file = self.glob_file_remote_copy_status()
2✔
205
        if status_file is None:
2✔
UNCOV
206
            status_file = self.file_remote_experiment_description.with_suffix('.status_pending')
×
UNCOV
207
            status_file.touch()
×
UNCOV
208
            log.warning(f'{status_file} not found and created')
×
209
        if status_file.name.endswith('pending'):
2✔
210
            return CopyState.PENDING, f'Copy pending {self.file_remote_experiment_description}'
2✔
211
        elif status_file.name.endswith('complete'):
2✔
212
            return CopyState.COMPLETE, f'Copy complete {self.file_remote_experiment_description}'
2✔
213
        elif status_file.name.endswith('final'):
2✔
214
            return CopyState.FINALIZED, f'Copy finalized {self.file_remote_experiment_description}'
2✔
215

216
    @property
2✔
217
    def experiment_description(self):
2✔
218
        return self._experiment_description
2✔
219

220
    @property
2✔
221
    def remote_session_path(self):
2✔
222
        if self.remote_subjects_folder:
2✔
223
            # padded_sequence ensures session path has zero padded number folder, e.g. 1 -> 001
224
            session_parts = alfiles.padded_sequence(self.session_path).parts[-3:]
2✔
225
            return self.remote_subjects_folder.joinpath(*session_parts)
2✔
226

227
    @property
2✔
228
    def file_experiment_description(self):
2✔
229
        """Returns the local experiment description file, if none found, returns one with the tag."""
230
        return next(
2✔
231
            self.session_path.glob('_ibl_experiment.description*'),
232
            self.session_path.joinpath(f'_ibl_experiment.description_{self.tag}.yaml'),
233
        )
234

235
    def glob_file_remote_copy_status(self, status='*'):
2✔
236
        """status: pending / complete"""
237
        fr = self.file_remote_experiment_description
2✔
238
        return next(fr.parent.glob(f'{fr.stem}.status_{status}'), None) if fr else None
2✔
239

240
    @property
2✔
241
    def file_remote_experiment_description(self):
2✔
242
        """Return the remote path to the remote stub file."""
243
        if self.remote_subjects_folder:
2✔
244
            return session_params.get_remote_stub_name(self.remote_session_path, device_id=self.tag)
2✔
245

246
    @property
2✔
247
    def remote_experiment_description_stub(self):
2✔
248
        return session_params.read_params(self.file_remote_experiment_description)
2✔
249

250
    def _copy_collections(self):
2✔
251
        """
252
        Copy collections defined in experiment description file.
253

254
        This is the method to subclass for pre- and post- copy routines.
255

256
        Returns
257
        -------
258
        bool
259
            True if transfer successfully completed.
260
        """
261
        status = True
2✔
262
        exp_pars = session_params.read_params(self.session_path)
2✔
263
        collections = set()
2✔
264
        # First glob on each collection pattern to find all folders to transfer
265
        for collection in session_params.get_collections(exp_pars, flat=True):
2✔
266
            folders = filter(Path.is_dir, self.session_path.glob(collection))
2✔
267
            _collections = list(map(lambda x: x.relative_to(self.session_path).as_posix(), folders))
2✔
268
            if not _collections:
2✔
UNCOV
269
                log.error(f'No collection(s) matching "{collection}" found')
×
UNCOV
270
                status = False
×
UNCOV
271
                continue
×
272
            collections.update(_collections)
2✔
273

274
        # Attempt to copy each folder
275
        for collection in collections:
2✔
276
            local_collection = self.session_path.joinpath(collection)
2✔
277
            assert local_collection.exists(), f'local collection "{collection}" no longer exists'
2✔
278
            log.info(f'transferring {self.session_path} - {collection}')
2✔
279
            remote_collection = self.remote_session_path.joinpath(collection)
2✔
280
            if remote_collection.exists():
2✔
281
                # this is far from ideal: we currently recopy all files even if some already copied
UNCOV
282
                log.warning(f'Collection {remote_collection} already exists, removing')
×
UNCOV
283
                shutil.rmtree(remote_collection)
×
284
            status &= copy_folders(local_collection, remote_collection)
2✔
285
        status &= self.copy_snapshots()  # special case: copy snapshots without deleting or overwriting remote files
2✔
286
        return status
2✔
287

288
    def copy_snapshots(self):
2✔
289
        """
290
        Copy snapshots files from root session path.
291

292
        Unlike the collection folders defined in the experiment description folder, the snapshots folder is optional and
293
        may exist on multiple acquisition PCs.  This method will copy any files over if the snapshots folder exists,
294
        however it will fail if a file of the same name already exists. This ensures snapshots from multiple computers
295
        don't get overwritten.
296

297
        Returns
298
        -------
299
        bool
300
            True if transfer successfully completed.
301
        """
302
        snapshots = self.session_path.joinpath('snapshots')
2✔
303
        if not snapshots.exists():
2✔
304
            log.debug('Snapshots folder %s does not exist', snapshots.as_posix())
2✔
305
            return True
2✔
306
        log.info(f'transferring {self.session_path} - {snapshots.name}')
2✔
307
        remote_snapshots = self.remote_session_path.joinpath(snapshots.name)
2✔
308
        # Get set of local and remote snapshot filenames
309
        local_files, remote_files = (
2✔
310
            set(map(lambda x: x.relative_to(p).as_posix(), filter(Path.is_file, p.rglob('*'))))
311
            for p in (snapshots, remote_snapshots)
312
        )
313
        if not any(local_files):
2✔
314
            log.debug('Snapshots folder %s is empty', snapshots.as_posix())
2✔
315
            return True
2✔
316
        if any(duplicates := local_files.intersection(remote_files)):
2✔
317
            log.error('The following snapshots already exist in %s\n%s', remote_snapshots.as_posix(), ', '.join(duplicates))
2✔
318
            log.info('Could not copy %s to %s', snapshots.as_posix(), remote_snapshots.as_posix())
2✔
319
            return False
2✔
320
        # 'overwrite' actually means 'don't raise if remote folder exists'.
321
        # We've already checked that filenames don't conflict.
322
        return copy_folders(snapshots, remote_snapshots, overwrite=True)
2✔
323

324
    def copy_collections(self, *args, **kwargs):
2✔
325
        """
326
        Recursively copies the collection folders into the remote session path.
327

328
        Do not overload, overload _copy_collections instead.
329
        """
330
        if self.glob_file_remote_copy_status('complete'):
2✔
331
            log.warning(
2✔
332
                f'Copy already complete for {self.session_path}, remove {self.glob_file_remote_copy_status("complete")} to force'
333
            )
334
            return True
2✔
335
        status = self._copy_collections(*args, **kwargs)
2✔
336
        # post copy stuff: rename the pending flag to complete
337
        if status:
2✔
338
            pending_file = self.glob_file_remote_copy_status('pending')
2✔
339
            pending_file.rename(pending_file.with_suffix('.status_complete'))
2✔
340
            if self.session_path.joinpath('transfer_me.flag').exists():
2✔
341
                self.session_path.joinpath('transfer_me.flag').unlink()
2✔
342
        return status
2✔
343

344
    def initialize_experiment(self, acquisition_description=None, overwrite=True):
2✔
345
        """
346
        Copy acquisition description yaml to the server and local transfers folder.
347

348
        Parameters
349
        ----------
350
        acquisition_description : dict
351
            The data to write to the experiment.description.yaml file.
352
        overwrite : bool
353
            If true, overwrite any existing file with the new one, otherwise, update the existing file.
354
        """
355
        if acquisition_description is None:
2✔
UNCOV
356
            acquisition_description = self.experiment_description
×
357

358
        assert acquisition_description
2✔
359

360
        # First attempt to add the remote description stub to the _device folder on the remote session
361
        if not self.remote_subjects_folder:
2✔
362
            log.info('The remote path is unspecified and remote experiment.description stub creation is omitted.')
2✔
363
        else:
364
            remote_stub_file = self.file_remote_experiment_description
2✔
365
            previous_description = (
2✔
366
                session_params.read_params(remote_stub_file) if remote_stub_file.exists() and not overwrite else {}
367
            )
368
            try:
2✔
369
                merged_description = session_params.merge_params(previous_description, acquisition_description)
2✔
370
                session_params.write_yaml(remote_stub_file, merged_description)
2✔
371
                for f in remote_stub_file.parent.glob(remote_stub_file.stem + '.status_*'):
2✔
UNCOV
372
                    f.unlink()
×
373
                remote_stub_file.with_suffix('.status_pending').touch()
2✔
374
                log.info(f'Written data to remote device at: {remote_stub_file}.')
2✔
375
            except Exception as e:
1✔
376
                if self.assert_connect_on_init:
1✔
UNCOV
377
                    raise RuntimeError(f'Failed to write data to remote device at: {remote_stub_file}. \n {e}') from e
×
378
                log.warning(f'Failed to write data to remote device at: {remote_stub_file}. \n {e}')
1✔
379

380
        # then create on the local machine
381
        previous_description = (
2✔
382
            session_params.read_params(self.file_experiment_description)
383
            if self.file_experiment_description.exists() and not overwrite
384
            else {}
385
        )
386
        session_params.write_yaml(
2✔
387
            self.file_experiment_description, session_params.merge_params(previous_description, acquisition_description)
388
        )
389
        log.info(f'Written data to local session at : {self.file_experiment_description}.')
2✔
390

391
    def finalize_copy(self, number_of_expected_devices=None):
2✔
392
        """At the end of the copy, check if all the files are there and if so, aggregate the device files."""
393
        ready_to_finalize = 0
2✔
394
        # List the stub files in _devices folder
395
        files_stub = list(self.file_remote_experiment_description.parent.glob('*.yaml'))
2✔
396
        if number_of_expected_devices is None:
2✔
397
            number_of_expected_devices = len(files_stub)
2✔
398
        log.debug(f'Number of expected devices is {number_of_expected_devices}')
2✔
399

400
        for file_stub in files_stub:
2✔
401
            ready_to_finalize += int(file_stub.with_suffix('.status_complete').exists())
2✔
402
            ad_stub = session_params.read_params(file_stub)
2✔
403
            # here we check the sync field of the device files
404
            if next(iter(ad_stub.get('sync', {})), None) != 'bpod' and number_of_expected_devices == 1:
2✔
405
                log.warning(
2✔
406
                    'Only bpod is supported for single device sessions, it seems you are '
407
                    'attempting to transfer a session with more than one device.'
408
                )
409
                return
2✔
410

411
        if ready_to_finalize > number_of_expected_devices:
2✔
UNCOV
412
            log.error('More stub files (%i) than expected devices (%i)', ready_to_finalize, number_of_expected_devices)
×
UNCOV
413
            return
×
414
        log.info(f'{ready_to_finalize}/{number_of_expected_devices} copy completion status')
2✔
415
        if ready_to_finalize == number_of_expected_devices:
2✔
416
            for file_stub in files_stub:
2✔
417
                session_params.aggregate_device(file_stub, self.remote_session_path.joinpath('_ibl_experiment.description.yaml'))
2✔
418
                file_stub.with_suffix('.status_complete').rename(file_stub.with_suffix('.status_final'))
2✔
419
            self.remote_session_path.joinpath('raw_session.flag').touch()
2✔
420

421

422
class VideoCopier(SessionCopier):
2✔
423
    tag = 'video'
2✔
424
    assert_connect_on_init = True
2✔
425

426
    def create_video_stub(self, config, collection='raw_video_data'):
2✔
427
        acquisition_description = self.config2stub(config, collection)
2✔
428
        session_params.write_params(self.session_path, acquisition_description)
2✔
429

430
    @staticmethod
2✔
431
    def config2stub(config: dict, collection: str = 'raw_video_data') -> dict:
2✔
432
        """
433
        Generate acquisition description stub from a camera config dict.
434

435
        Parameters
436
        ----------
437
        config : dict
438
            A cameras configuration dictionary, found in `device_cameras` of hardware_settings.yaml.
439
        collection : str
440
            The video output collection.
441

442
        Returns
443
        -------
444
        dict
445
            An acquisition description file stub.
446
        """
447
        cameras = {}
2✔
448
        for label, settings in filter(lambda itms: itms[0] != 'BONSAI_WORKFLOW', config.items()):
2✔
449
            settings_mod = {k.lower(): v for k, v in settings.items() if v is not None and k != 'INDEX'}
2✔
450
            cameras[label] = dict(collection=collection, **settings_mod)
2✔
451
        acq_desc = {'devices': {'cameras': cameras}, 'version': '1.0.0'}
2✔
452
        return acq_desc
2✔
453

454
    def initialize_experiment(self, acquisition_description=None, **kwargs):
2✔
455
        if not acquisition_description:
2✔
456
            # creates the acquisition description stub if not found, and then read it
457
            if not self.file_experiment_description.exists():
2✔
UNCOV
458
                raise FileNotFoundError(self.file_experiment_description)
×
459
            acquisition_description = session_params.read_params(self.file_experiment_description)
2✔
460
        self._experiment_description = acquisition_description
2✔
461
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
462

463

464
class BehaviorCopier(SessionCopier):
2✔
465
    tag = 'behavior'
2✔
466
    assert_connect_on_init = False
2✔
467

468
    @property
2✔
469
    def experiment_description(self):
2✔
470
        return session_params.read_params(self.session_path)
2✔
471

472
    def _copy_collections(self):
2✔
473
        """Patch settings files before copy.
474

475
        Before copying the collections, this method checks that the behaviour data are valid. The
476
        following checks are made:
477

478
        #. Check at least 1 task collection in experiment description. If not, return.
479
        #. For each collection, check for task settings. If any are missing, return.
480
        #. If SESSION_END_TIME is missing, assumes task crashed. If so and task data missing and
481
           not a chained protocol (i.e. it is the only task collection), assume a dud and remove
482
           the remote stub file.  Otherwise, patch settings with total trials, end time, etc.
483

484
        Returns
485
        -------
486
        bool
487
            True if transfer successfully completed.
488

489
        """
490
        collections = session_params.get_task_collection(self.experiment_description)
2✔
491
        if not collections:
2✔
UNCOV
492
            log.error(f'Skipping: no task collections defined for {self.session_path}')
×
UNCOV
493
            return False
×
494
        for collection in (collections := ensure_list(collections)):
2✔
495
            task_settings = raw_data_loaders.load_settings(self.session_path, task_collection=collection)
2✔
496
            if task_settings is None:
2✔
UNCOV
497
                log.info(f'Skipping: no task settings found for {self.session_path}')
×
UNCOV
498
                return False  # may also want to remove session here if empty
×
499
            # here if the session end time has not been labeled we assume that the session crashed, and patch the settings
500
            if task_settings['SESSION_END_TIME'] is None:
2✔
501
                jsonable = self.session_path.joinpath(collection, '_iblrig_taskData.raw.jsonable')
2✔
502
                if not jsonable.exists():
2✔
503
                    log.info(f'Skipping: no task data found for {self.session_path}')
2✔
504
                    # No local data and only behaviour stub in remote; assume dud and remove entire session
505
                    if (
2✔
506
                        self.remote_session_path.exists()
507
                        and len(collections) == 1
508
                        and len(list(self.file_remote_experiment_description.parent.glob('*.yaml'))) <= 1
509
                    ):
510
                        shutil.rmtree(self.remote_session_path)  # remove likely dud
2✔
511
                    return False
2✔
512
                trials, bpod_data = load_task_jsonable(jsonable)
2✔
513
                ntrials = trials.shape[0]
2✔
514
                # We have the case where the session hard crashed.
515
                # Patch the settings file to wrap the session and continue the copying.
516
                log.warning(f'Recovering crashed session {self.session_path}')
2✔
517
                settings_file = self.session_path.joinpath(collection, '_iblrig_taskSettings.raw.json')
2✔
518
                with open(settings_file) as fid:
2✔
519
                    raw_settings = json.load(fid)
2✔
520
                raw_settings['NTRIALS'] = int(ntrials)
2✔
521
                raw_settings['NTRIALS_CORRECT'] = int(trials['trial_correct'].sum())
2✔
522
                raw_settings['TOTAL_WATER_DELIVERED'] = int(trials['reward_amount'].sum())
2✔
523
                # cast the timestamp in a datetime object and add the session length to it
524
                end_time = datetime.datetime.strptime(raw_settings['SESSION_START_TIME'], '%Y-%m-%dT%H:%M:%S.%f')
2✔
525
                end_time += datetime.timedelta(seconds=bpod_data[-1]['Trial end timestamp'])
2✔
526
                raw_settings['SESSION_END_TIME'] = end_time.strftime('%Y-%m-%dT%H:%M:%S.%f')
2✔
527
                with open(settings_file, 'w') as fid:
2✔
528
                    json.dump(raw_settings, fid)
2✔
529
        log.critical(f'{self.state}, {self.session_path}')
2✔
530
        return super()._copy_collections()  # proceed with copy
2✔
531

532
    def finalize_copy(self, number_of_expected_devices=None):
2✔
533
        """If main sync is bpod, expect a single stub file."""
534
        if number_of_expected_devices is None and session_params.get_sync(self.remote_experiment_description_stub) == 'bpod':
2✔
UNCOV
535
            number_of_expected_devices = 1
×
536
        super().finalize_copy(number_of_expected_devices=number_of_expected_devices)
2✔
537

538

539
class EphysCopier(SessionCopier):
2✔
540
    tag = 'ephys'
2✔
541
    assert_connect_on_init = True
2✔
542

543
    def initialize_experiment(self, acquisition_description=None, nprobes=None, main_sync=True, **kwargs):
2✔
544
        if not acquisition_description:
2✔
545
            acquisition_description = {'devices': {'neuropixel': {}}}
2✔
546
            neuropixel = acquisition_description['devices']['neuropixel']
2✔
547
            if nprobes is None:
2✔
548
                nprobes = len(list(self.session_path.glob('**/*.ap.bin')))
2✔
549
            for n in range(nprobes):
2✔
550
                name = f'probe{n:02}'
2✔
551
                neuropixel[name] = {'collection': f'raw_ephys_data/{name}', 'sync_label': 'imec_sync'}
2✔
552
            sync_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'sync', 'nidq.yaml')
2✔
553
            acquisition_description = acquisition_description if neuropixel else {}
2✔
554
            if main_sync:
2✔
555
                acquisition_description.update(session_params.read_params(sync_file))
2✔
556

557
        self._experiment_description = acquisition_description
2✔
558
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
559
        # once the session folders have been initialized, create the probe folders
560
        (ephys_path := self.session_path.joinpath('raw_ephys_data')).mkdir(exist_ok=True)
2✔
561
        for n in range(nprobes):
2✔
562
            ephys_path.joinpath(f'probe{n:02}').mkdir(exist_ok=True)
2✔
563

564
    def _copy_collections(self):
2✔
565
        """Here we overload the copy to be able to rename the probes properly and also create the insertions."""
566
        log.info(f'Transferring ephys session: {self.session_path} to {self.remote_session_path}')
2✔
567
        ibllib.pipes.misc.rename_ephys_files(self.session_path)
2✔
568
        ibllib.pipes.misc.move_ephys_files(self.session_path)
2✔
569
        # copy the wiring files from template
570
        path_wiring = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'neuropixel', 'wirings')
2✔
571
        probe_model = '3A'
2✔
572
        for file_nidq_bin in self.session_path.joinpath('raw_ephys_data').glob('*.nidq.bin'):
2✔
573
            probe_model = '3B'
2✔
574
            shutil.copy(path_wiring.joinpath('nidq.wiring.json'), file_nidq_bin.with_suffix('.wiring.json'))
2✔
575
        for file_ap_bin in self.session_path.joinpath('raw_ephys_data').rglob('*.ap.bin'):
2✔
576
            shutil.copy(path_wiring.joinpath(f'{probe_model}.wiring.json'), file_ap_bin.with_suffix('.wiring.json'))
2✔
577
        try:
2✔
578
            ibllib.pipes.misc.create_alyx_probe_insertions(self.session_path)
2✔
579
        except Exception:
2✔
580
            log.error(traceback.print_exc())
2✔
581
            log.info('Probe creation failed, please create the probe insertions manually. Continuing transfer...')
2✔
582
        return copy_folders(
2✔
583
            local_folder=self.session_path.joinpath('raw_ephys_data'),
584
            remote_folder=self.remote_session_path.joinpath('raw_ephys_data'),
585
            overwrite=True,
586
        )
587

588

589
class NeurophotometricsCopier(SessionCopier):
2✔
590
    tag = 'neurophotometrics'
2✔
591
    assert_connect_on_init = True
2✔
592

593
    def __init__(self, *args, **kwargs):
2✔
594
        super().__init__(*args, **kwargs)
2✔
595
        if self.file_experiment_description.exists() and self.experiment_description is None:
2✔
596
            self._experiment_description = session_params.read_params(self.file_experiment_description)
2✔
597

598
    def initialize_experiment(self, acquisition_description=None, **kwargs):
2✔
599
        assert acquisition_description is not None, 'No acquisition description provided'
2✔
600
        self._experiment_description = acquisition_description
2✔
601
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
602
        self.session_path.joinpath('transfer_me.flag').touch()
2✔
603

604
    @staticmethod
2✔
605
    def neurophotometrics_description(
2✔
606
        rois: Iterable[str],
607
        locations: Iterable[str],
608
        sync_channel: int,
609
        start_time: datetime.datetime = None,
610
        sync_label: str = None,
611
        collection: str = 'raw_photometry_data',
612
    ) -> dict:
613
        """
614
        Create the `neurophotometrics` description part for the specified parameters.
615

616
        Parameters
617
        ----------
618
        rois: list of strings
619
            List of ROIs
620
        locations: list of strings
621
            List of brain regions
622
        sync_channel: int
623
            Channel number for sync
624
        start_time: datetime.datetime, optional
625
            Date and time of the recording
626
        sync_label: str, optional
627
            Label for the sync channel
628

629
        Returns
630
        -------
631
        dict
632
            Description of the neurophotometrics data
633
            {neurophotometrics': ...}, see below for the yaml rendition of dictionaries
634

635

636
        Example where bpod sends sync to the neurophotometrics:
637
        -------
638
            neurophotometrics:
639
                fibers:
640
                - roi: G0
641
                  location: VTA
642
                - roi: G1
643
                  location: DR
644
                collection: raw_photometry_data
645
                sync_label: bnc1out
646
                sync_channel: 1
647
                datetime: 2024-09-19T14:13:18.749259
648
            sync:
649
                bpod
650

651
        Here MAIN_SYNC=True on behaviour
652

653
        Example where a DAQ records frame times and sync:
654
        -------
655
            neurophotometrics:
656
                fibers:
657
                - roi: G0
658
                  location: VTA
659
                - roi: G1
660
                  location: DR
661
                collection: raw_photometry_data
662
                sync_channel: 5
663
                datetime: 2024-09-19T14:13:18.749259
664
            sync:
665
                daqami:
666
                    acquisition_software: daqami
667
                    collection: raw_sync_data
668
                    extension: bin
669
        """
670
        date_time = datetime.datetime.now() if start_time is None else start_time
2✔
671
        description = {
2✔
672
            'sync_channel': sync_channel,
673
            'datetime': date_time.isoformat(),
674
            'collection': collection,
675
        }
676
        if sync_label is not None:
2✔
UNCOV
677
            description['sync_label'] = sync_label
×
678
        description['fibers'] = {roi: {'location': location} for roi, location in zip(rois, locations, strict=False)}
2✔
679
        return {'neurophotometrics': description}
2✔
680

681
    def _copy_collections(self, folder_neurophotometric: Path) -> bool:
2✔
682
        ed = self.experiment_description['neurophotometrics']
2✔
683
        dt = datetime.datetime.fromisoformat(ed['datetime'])
2✔
684
        # Here we find the first photometry folder after the start_time. In case this is failing
685
        # we can feed a custom start_time to go to the desired folder, or just rename the folder
686
        folder_day = next(folder_neurophotometric.glob(ed['datetime'][:10]), None)
2✔
687
        assert folder_day is not None, f"Neurophotometrics folder {folder_neurophotometric} doesn't contain data"
2✔
688
        folder_times = list(folder_day.glob('T*'))
2✔
689
        assert len(folder_times) >= 1, f'No neurophotometrics acquisition files found in {folder_day}'
2✔
690
        hhmmss = sorted([int(stem[1:]) for stem in [f.stem for f in folder_times]])
2✔
691
        i = np.searchsorted(hhmmss, int(dt.strftime('%H%M%S'))) - 1
2✔
692
        csv_raw_photometry = folder_day.joinpath(f'T{hhmmss[i]}', 'raw_photometry.csv')
2✔
693
        csv_digital_inputs = folder_day.joinpath(f'T{hhmmss[i]}', 'digital_inputs.csv')
2✔
694
        assert csv_raw_photometry.exists(), f'Raw photometry file {csv_raw_photometry} not found'
2✔
695
        assert csv_digital_inputs.exists(), f'Digital inputs file {csv_digital_inputs} not found'
2✔
696
        # Copy the raw and digital inputs files to the server
697
        # TODO move this into a data loader ? Especially the schemas will apply to both the csv and parquet format
698
        df_raw_photometry = pd.read_csv(csv_raw_photometry)
2✔
699
        df_digital_inputs = pd.read_csv(csv_digital_inputs, header=None)
2✔
700
        df_digital_inputs.columns = ['ChannelName', 'Channel', 'AlwaysTrue', 'SystemTimestamp', 'ComputerTimestamp']
2✔
701
        # this will ensure the columns are present, and that there was no magic new format on a new Bonsai version
702
        schema_raw_data = pandera.DataFrameSchema(
2✔
703
            columns=dict(
704
                FrameCounter=pandera.Column(pandera.Int64),
705
                SystemTimestamp=pandera.Column(pandera.Float64),
706
                LedState=pandera.Column(pandera.Int16, coerce=True),
707
                ComputerTimestamp=pandera.Column(pandera.Float64),
708
                **{k: pandera.Column(pandera.Float64) for k in ed['fibers']},
709
            )
710
        )
711
        schema_digital_inputs = pandera.DataFrameSchema(
2✔
712
            columns=dict(
713
                ChannelName=pandera.Column(str, coerce=True),
714
                Channel=pandera.Column(pandera.Int8, coerce=True),
715
                AlwaysTrue=pandera.Column(bool, coerce=True),
716
                SystemTimestamp=pandera.Column(pandera.Float64),
717
                ComputerTimestamp=pandera.Column(pandera.Float64),
718
            )
719
        )
720
        df_raw_photometry = schema_raw_data.validate(df_raw_photometry)
2✔
721
        df_digital_inputs = schema_digital_inputs.validate(df_digital_inputs)
2✔
722
        remote_photometry_path = self.remote_session_path.joinpath(ed['collection'])
2✔
723
        remote_photometry_path.mkdir(parents=True, exist_ok=True)
2✔
724
        df_raw_photometry.to_parquet(remote_photometry_path.joinpath('_neurophotometrics_fpData.raw.pqt'))
2✔
725
        df_digital_inputs.to_parquet(remote_photometry_path.joinpath('_neurophotometrics_fpData.digitalIntputs.pqt'))
2✔
726
        shutil.copy(
2✔
727
            Path(iblrig.__file__).parents[1].joinpath('devices', 'neurophotometrics', '_neurophotometrics_fpData.channels.csv'),
728
            remote_photometry_path.joinpath('_neurophotometrics_fpData.channels.csv'),
729
        )
730

731
        return True
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc