• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 10568073180

26 Aug 2024 10:13PM UTC coverage: 47.538% (+0.7%) from 46.79%
10568073180

Pull #711

github

eeff82
web-flow
Merge 599c9edfb into ad41db25f
Pull Request #711: 8.23.2

121 of 135 new or added lines in 8 files covered. (89.63%)

1025 existing lines in 22 files now uncovered.

4084 of 8591 relevant lines covered (47.54%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.44
/iblrig/transfer_experiments.py
1
import datetime
2✔
2
import json
2✔
3
import logging
2✔
4
import os
2✔
5
import shutil
2✔
6
import socket
2✔
7
import traceback
2✔
8
import uuid
2✔
9
from enum import IntEnum
2✔
10
from os.path import samestat
2✔
11
from pathlib import Path
2✔
12

13
import ibllib.pipes.misc
2✔
14
import iblrig
2✔
15
import one.alf.files as alfiles
2✔
16
from ibllib.io import raw_data_loaders, session_params
2✔
17
from ibllib.pipes.misc import sleepless
2✔
18
from iblrig.raw_data_loaders import load_task_jsonable
2✔
19
from iblutil.io import hashfile
2✔
20
from one.util import ensure_list
2✔
21

22
log = logging.getLogger(__name__)
2✔
23

24
ES_CONTINUOUS = 0x80000000
2✔
25
ES_SYSTEM_REQUIRED = 0x00000001
2✔
26

27

28
class CopyState(IntEnum):
2✔
29
    HARD_RESET = -1
2✔
30
    NOT_REGISTERED = 0
2✔
31
    PENDING = 1
2✔
32
    COMPLETE = 2
2✔
33
    FINALIZED = 3
2✔
34

35

36
@sleepless
2✔
37
def _copy2_checksum(src: str, dst: str, *args, **kwargs) -> str:
2✔
38
    """
39
    Copy a file from source to destination with checksum verification.
40

41
    This function copies a file from the source path to the destination path
42
    while verifying the BLAKE2B hash of the source and destination files. If the
43
    BLAKE2B hashes do not match after copying, an OSError is raised.
44

45
    Parameters
46
    ----------
47
    src : str
48
        The path to the source file.
49
    dst : str
50
        The path to the destination file.
51
    *args, **kwargs
52
        Additional arguments and keyword arguments to pass to `shutil.copy2`.
53

54
    Returns
55
    -------
56
    str
57
        The path to the copied file.
58

59
    Raises
60
    ------
61
    OSError
62
        If the BLAKE2B hashes of the source and destination files do not match.
63
    """
64
    log.info(f'Processing `{src}`:')
2✔
65
    log.info('  - calculating hash of local file')
2✔
66
    src_md5 = hashfile.blake2b(src, False)
2✔
67
    if os.path.exists(dst) and samestat(os.stat(src), os.stat(dst)):
2✔
UNCOV
68
        log.info('  - file already exists at destination')
×
UNCOV
69
        log.info('  - calculating hash of remote file')
×
UNCOV
70
        if src_md5 == hashfile.blake2b(dst, False):
×
UNCOV
71
            log.info('  - local and remote BLAKE2B hashes match, skipping copy')
×
UNCOV
72
            return dst
×
73
        else:
UNCOV
74
            log.info('  - local and remote hashes DO NOT match')
×
75
    log.info(f'  - copying file to `{dst}`')
2✔
76
    return_val = shutil.copy2(src, dst, *args, **kwargs)
2✔
77
    log.info('  - calculating hash of remote file')
2✔
78
    if not src_md5 == hashfile.blake2b(dst, False):
2✔
UNCOV
79
        raise OSError(f'Error copying {src}: hash mismatch.')
×
80
    log.info('  - local and remote hashes match, copy successful')
2✔
81
    return return_val
2✔
82

83

84
def copy_folders(local_folder: Path, remote_folder: Path, overwrite: bool = False) -> bool:
2✔
85
    """
86
    Copy folders and files from a local location to a remote location.
87

88
    This function copies all folders and files from a local directory to a
89
    remote directory. It provides options to overwrite existing files in
90
    the remote directory and ignore specific file patterns.
91

92
    Parameters
93
    ----------
94
    local_folder : Path
95
        The path to the local folder to copy from.
96
    remote_folder : Path
97
        The path to the remote folder to copy to.
98
    overwrite : bool, optional
99
        If True, overwrite existing files in the remote folder. Default is False.
100

101
    Returns
102
    -------
103
    bool
104
        True if the copying is successful, False otherwise.
105
    """
106
    status = True
2✔
107
    try:
2✔
108
        remote_folder.parent.mkdir(parents=True, exist_ok=True)
2✔
109
        shutil.copytree(
2✔
110
            local_folder,
111
            remote_folder,
112
            dirs_exist_ok=overwrite,
113
            ignore=shutil.ignore_patterns('transfer_me.flag'),
114
            copy_function=_copy2_checksum,
115
        )
UNCOV
116
    except OSError:
×
UNCOV
117
        log.error(traceback.format_exc())
×
UNCOV
118
        log.info(f'Could not copy {local_folder} to {remote_folder}')
×
UNCOV
119
        status = False
×
120
    return status
2✔
121

122

123
class SessionCopier:
2✔
124
    """Initialize and copy session data to a remote server."""
125

126
    assert_connect_on_init = True
2✔
127
    """bool: Raise error if unable to write stub file to remote server."""
2✔
128

129
    _experiment_description = None
2✔
130
    """dict: The experiment description file used for the copy."""
2✔
131

132
    tag = f'{socket.gethostname()}_{uuid.getnode()}'
2✔
133
    """str: The device name (adds this to the experiment description stub file on the remote server)."""
2✔
134

135
    def __init__(self, session_path, remote_subjects_folder=None, tag=None):
2✔
136
        """
137
        Initialize and copy session data to a remote server.
138

139
        Parameters
140
        ----------
141
        session_path : str, pathlib.Path
142
            The partial or session path to copy.
143
        remote_subjects_folder : str, pathlib.Path
144
            The remote server path to which to copy the session data.
145
        tag : str
146
            The device name (adds this to the experiment description stub file on the remote server).
147
        """
148
        self.tag = tag or self.tag
2✔
149
        self.session_path = Path(session_path)
2✔
150
        self.remote_subjects_folder = Path(remote_subjects_folder) if remote_subjects_folder else None
2✔
151

152
    def __repr__(self):
2✔
UNCOV
153
        return f'{super().__repr__()} \n local: {self.session_path} \n remote: {self.remote_session_path}'
×
154

155
    @property
2✔
156
    def state(self):
2✔
157
        return self.get_state()[0]
2✔
158

159
    def run(self, number_of_expected_devices=None):
2✔
160
        """
161
        Run the copy of this device experiment.
162

163
        Will try to get as far as possible in the copy process (from states 0 init experiment to state 3 finalize experiment)
164
        if possible, and return earlier if the process can't be completed.
165
        """
166
        if self.state == CopyState.HARD_RESET:  # this case is not implemented automatically and corresponds to a hard reset
2✔
UNCOV
167
            log.info(f'{self.state}, {self.session_path}')
×
UNCOV
168
            shutil.rmtree(self.remote_session_path)
×
UNCOV
169
            self.initialize_experiment()
×
170
        if self.state == CopyState.NOT_REGISTERED:  # the session hasn't even been initialized: copy the stub to the remote
2✔
UNCOV
171
            log.info(f'{self.state}, {self.session_path}')
×
UNCOV
172
            self.initialize_experiment()
×
173
        if self.state == CopyState.PENDING:  # the session is ready for copy
2✔
174
            log.info(f'{self.state}, {self.session_path}')
2✔
175
            self.copy_collections()
2✔
176
        if self.state == CopyState.COMPLETE:
2✔
177
            log.info(f'{self.state}, {self.session_path}')
2✔
178
            self.finalize_copy(number_of_expected_devices=number_of_expected_devices)
2✔
179
        if self.state == CopyState.FINALIZED:
2✔
180
            log.info(f'{self.state}, {self.session_path}')
2✔
181

182
    def get_state(self) -> tuple[CopyState | None, str]:
2✔
183
        """
184
        Gets the current copier state.
185

186
        State 0: this device experiment has not been initialized for this device
187
        State 1: this device experiment is initialized (the experiment description stub is present on the remote)
188
        State 2: this device experiment is copied on the remote server, but other devices copies are still pending
189
        State 3: the whole experiment is finalized and all data is on the server
190
        :return:
191
        """
192
        if self.remote_subjects_folder is None or not self.remote_subjects_folder.exists():
2✔
UNCOV
193
            return None, f'Remote subjects folder {self.remote_subjects_folder} set to Null or unreachable'
×
194
        if not self.file_remote_experiment_description.exists():
2✔
195
            return (
2✔
196
                CopyState.NOT_REGISTERED,
197
                f'Copy object not registered on server: {self.file_remote_experiment_description} does not exist',
198
            )
199
        status_file = self.glob_file_remote_copy_status()
2✔
200
        if status_file is None:
2✔
UNCOV
201
            status_file = self.file_remote_experiment_description.with_suffix('.status_pending')
×
UNCOV
202
            status_file.touch()
×
UNCOV
203
            log.warning(f'{status_file} not found and created')
×
204
        if status_file.name.endswith('pending'):
2✔
205
            return CopyState.PENDING, f'Copy pending {self.file_remote_experiment_description}'
2✔
206
        elif status_file.name.endswith('complete'):
2✔
207
            return CopyState.COMPLETE, f'Copy complete {self.file_remote_experiment_description}'
2✔
208
        elif status_file.name.endswith('final'):
2✔
209
            return CopyState.FINALIZED, f'Copy finalized {self.file_remote_experiment_description}'
2✔
210

211
    @property
2✔
212
    def experiment_description(self):
2✔
213
        return self._experiment_description
2✔
214

215
    @property
2✔
216
    def remote_session_path(self):
2✔
217
        if self.remote_subjects_folder:
2✔
218
            # padded_sequence ensures session path has zero padded number folder, e.g. 1 -> 001
219
            session_parts = alfiles.padded_sequence(self.session_path).parts[-3:]
2✔
220
            return self.remote_subjects_folder.joinpath(*session_parts)
2✔
221

222
    @property
2✔
223
    def file_experiment_description(self):
2✔
224
        """Returns the local experiment description file, if none found, returns one with the tag."""
225
        return next(
2✔
226
            self.session_path.glob('_ibl_experiment.description*'),
227
            self.session_path.joinpath(f'_ibl_experiment.description_{self.tag}.yaml'),
228
        )
229

230
    def glob_file_remote_copy_status(self, status='*'):
2✔
231
        """status: pending / complete"""
232
        fr = self.file_remote_experiment_description
2✔
233
        return next(fr.parent.glob(f'{fr.stem}.status_{status}'), None) if fr else None
2✔
234

235
    @property
2✔
236
    def file_remote_experiment_description(self):
2✔
237
        """Return the remote path to the remote stub file."""
238
        if self.remote_subjects_folder:
2✔
239
            return session_params.get_remote_stub_name(self.remote_session_path, device_id=self.tag)
2✔
240

241
    @property
2✔
242
    def remote_experiment_description_stub(self):
2✔
243
        return session_params.read_params(self.file_remote_experiment_description)
2✔
244

245
    def _copy_collections(self):
2✔
246
        """
247
        Copy collections defined in experiment description file.
248

249
        This is the method to subclass for pre- and post- copy routines.
250

251
        Returns
252
        -------
253
        bool
254
            True if transfer successfully completed.
255
        """
256
        status = True
2✔
257
        exp_pars = session_params.read_params(self.session_path)
2✔
258
        collections = set()
2✔
259
        # First glob on each collection pattern to find all folders to transfer
260
        for collection in session_params.get_collections(exp_pars, flat=True):
2✔
261
            folders = filter(Path.is_dir, self.session_path.glob(collection))
2✔
262
            _collections = list(map(lambda x: x.relative_to(self.session_path).as_posix(), folders))
2✔
263
            if not _collections:
2✔
264
                log.error(f'No collection(s) matching "{collection}" found')
×
265
                status = False
×
266
                continue
×
267
            collections.update(_collections)
2✔
268

269
        # Attempt to copy each folder
270
        for collection in collections:
2✔
271
            local_collection = self.session_path.joinpath(collection)
2✔
272
            assert local_collection.exists(), f'local collection "{collection}" no longer exists'
2✔
273
            log.info(f'transferring {self.session_path} - {collection}')
2✔
274
            remote_collection = self.remote_session_path.joinpath(collection)
2✔
275
            if remote_collection.exists():
2✔
276
                # this is far from ideal: we currently recopy all files even if some already copied
UNCOV
277
                log.warning(f'Collection {remote_collection} already exists, removing')
×
UNCOV
278
                shutil.rmtree(remote_collection)
×
279
            status &= copy_folders(local_collection, remote_collection)
2✔
280
        status &= self.copy_snapshots()  # special case: copy snapshots without deleting or overwriting remote files
2✔
281
        return status
2✔
282

283
    def copy_snapshots(self):
2✔
284
        """
285
        Copy snapshots files from root session path.
286

287
        Unlike the collection folders defined in the experiment description folder, the snapshots folder is optional and
288
        may exist on multiple acquisition PCs.  This method will copy any files over if the snapshots folder exists,
289
        however it will fail if a file of the same name already exists. This ensures snapshots from multiple computers
290
        don't get overwritten.
291

292
        Returns
293
        -------
294
        bool
295
            True if transfer successfully completed.
296
        """
297
        snapshots = self.session_path.joinpath('snapshots')
2✔
298
        if not snapshots.exists():
2✔
299
            log.debug('Snapshots folder %s does not exist', snapshots.as_posix())
2✔
300
            return True
2✔
301
        log.info(f'transferring {self.session_path} - {snapshots.name}')
2✔
302
        remote_snapshots = self.remote_session_path.joinpath(snapshots.name)
2✔
303
        # Get set of local and remote snapshot filenames
304
        local_files, remote_files = (
2✔
305
            set(map(lambda x: x.relative_to(p).as_posix(), filter(Path.is_file, p.rglob('*'))))
306
            for p in (snapshots, remote_snapshots)
307
        )
308
        if not any(local_files):
2✔
309
            log.debug('Snapshots folder %s is empty', snapshots.as_posix())
2✔
310
            return True
2✔
311
        if any(duplicates := local_files.intersection(remote_files)):
2✔
312
            log.error('The following snapshots already exist in %s\n%s', remote_snapshots.as_posix(), ', '.join(duplicates))
2✔
313
            log.info('Could not copy %s to %s', snapshots.as_posix(), remote_snapshots.as_posix())
2✔
314
            return False
2✔
315
        # 'overwrite' actually means 'don't raise if remote folder exists'.
316
        # We've already checked that filenames don't conflict.
317
        return copy_folders(snapshots, remote_snapshots, overwrite=True)
2✔
318

319
    def copy_collections(self):
2✔
320
        """
321
        Recursively copies the collection folders into the remote session path.
322

323
        Do not overload, overload _copy_collections instead.
324
        """
325
        if self.glob_file_remote_copy_status('complete'):
2✔
326
            log.warning(
2✔
327
                f'Copy already complete for {self.session_path},'
328
                f' remove {self.glob_file_remote_copy_status("complete")} to force'
329
            )
330
            return True
2✔
331
        status = self._copy_collections()
2✔
332
        # post copy stuff: rename the pending flag to complete
333
        if status:
2✔
334
            pending_file = self.glob_file_remote_copy_status('pending')
2✔
335
            pending_file.rename(pending_file.with_suffix('.status_complete'))
2✔
336
            if self.session_path.joinpath('transfer_me.flag').exists():
2✔
337
                self.session_path.joinpath('transfer_me.flag').unlink()
2✔
338
        return status
2✔
339

340
    def initialize_experiment(self, acquisition_description=None, overwrite=True):
2✔
341
        """
342
        Copy acquisition description yaml to the server and local transfers folder.
343

344
        Parameters
345
        ----------
346
        acquisition_description : dict
347
            The data to write to the experiment.description.yaml file.
348
        overwrite : bool
349
            If true, overwrite any existing file with the new one, otherwise, update the existing file.
350
        """
351
        if acquisition_description is None:
2✔
UNCOV
352
            acquisition_description = self.experiment_description
×
353

354
        assert acquisition_description
2✔
355

356
        # First attempt to add the remote description stub to the _device folder on the remote session
357
        if not self.remote_subjects_folder:
2✔
358
            log.info('The remote path is unspecified and remote experiment.description stub creation is omitted.')
2✔
359
        else:
360
            remote_stub_file = self.file_remote_experiment_description
2✔
361
            previous_description = (
2✔
362
                session_params.read_params(remote_stub_file) if remote_stub_file.exists() and not overwrite else {}
363
            )
364
            try:
2✔
365
                merged_description = session_params.merge_params(previous_description, acquisition_description)
2✔
366
                session_params.write_yaml(remote_stub_file, merged_description)
2✔
367
                for f in remote_stub_file.parent.glob(remote_stub_file.stem + '.status_*'):
2✔
UNCOV
368
                    f.unlink()
×
369
                remote_stub_file.with_suffix('.status_pending').touch()
2✔
370
                log.info(f'Written data to remote device at: {remote_stub_file}.')
2✔
371
            except Exception as e:
1✔
372
                if self.assert_connect_on_init:
1✔
UNCOV
373
                    raise RuntimeError(f'Failed to write data to remote device at: {remote_stub_file}. \n {e}') from e
×
374
                log.warning(f'Failed to write data to remote device at: {remote_stub_file}. \n {e}')
1✔
375

376
        # then create on the local machine
377
        previous_description = (
2✔
378
            session_params.read_params(self.file_experiment_description)
379
            if self.file_experiment_description.exists() and not overwrite
380
            else {}
381
        )
382
        session_params.write_yaml(
2✔
383
            self.file_experiment_description, session_params.merge_params(previous_description, acquisition_description)
384
        )
385
        log.info(f'Written data to local session at : {self.file_experiment_description}.')
2✔
386

387
    def finalize_copy(self, number_of_expected_devices=None):
2✔
388
        """At the end of the copy, check if all the files are there and if so, aggregate the device files."""
389
        ready_to_finalize = 0
2✔
390
        # List the stub files in _devices folder
391
        files_stub = list(self.file_remote_experiment_description.parent.glob('*.yaml'))
2✔
392
        if number_of_expected_devices is None:
2✔
393
            number_of_expected_devices = len(files_stub)
2✔
394
        log.debug(f'Number of expected devices is {number_of_expected_devices}')
2✔
395

396
        for file_stub in files_stub:
2✔
397
            ready_to_finalize += int(file_stub.with_suffix('.status_complete').exists())
2✔
398
            ad_stub = session_params.read_params(file_stub)
2✔
399
            # here we check the sync field of the device files
400
            if next(iter(ad_stub.get('sync', {})), None) != 'bpod' and number_of_expected_devices == 1:
2✔
401
                log.warning(
2✔
402
                    'Only bpod is supported for single device sessions, it seems you are '
403
                    'attempting to transfer a session with more than one device.'
404
                )
405
                return
2✔
406

407
        if ready_to_finalize > number_of_expected_devices:
2✔
UNCOV
408
            log.error('More stub files (%i) than expected devices (%i)', ready_to_finalize, number_of_expected_devices)
×
UNCOV
409
            return
×
410
        log.info(f'{ready_to_finalize}/{number_of_expected_devices} copy completion status')
2✔
411
        if ready_to_finalize == number_of_expected_devices:
2✔
412
            for file_stub in files_stub:
2✔
413
                session_params.aggregate_device(file_stub, self.remote_session_path.joinpath('_ibl_experiment.description.yaml'))
2✔
414
                file_stub.with_suffix('.status_complete').rename(file_stub.with_suffix('.status_final'))
2✔
415
            self.remote_session_path.joinpath('raw_session.flag').touch()
2✔
416

417

418
class VideoCopier(SessionCopier):
2✔
419
    tag = 'video'
2✔
420
    assert_connect_on_init = True
2✔
421

422
    def create_video_stub(self, config, collection='raw_video_data'):
2✔
423
        acquisition_description = self.config2stub(config, collection)
2✔
424
        session_params.write_params(self.session_path, acquisition_description)
2✔
425

426
    @staticmethod
2✔
427
    def config2stub(config: dict, collection: str = 'raw_video_data') -> dict:
2✔
428
        """
429
        Generate acquisition description stub from a camera config dict.
430

431
        Parameters
432
        ----------
433
        config : dict
434
            A cameras configuration dictionary, found in `device_cameras` of hardware_settings.yaml.
435
        collection : str
436
            The video output collection.
437

438
        Returns
439
        -------
440
        dict
441
            An acquisition description file stub.
442
        """
443
        cameras = {}
2✔
444
        for label, settings in filter(lambda itms: itms[0] != 'BONSAI_WORKFLOW', config.items()):
2✔
445
            settings_mod = {k.lower(): v for k, v in settings.items() if v is not None and k != 'INDEX'}
2✔
446
            cameras[label] = dict(collection=collection, **settings_mod)
2✔
447
        acq_desc = {'devices': {'cameras': cameras}, 'version': '1.0.0'}
2✔
448
        return acq_desc
2✔
449

450
    def initialize_experiment(self, acquisition_description=None, **kwargs):
2✔
451
        if not acquisition_description:
2✔
452
            # creates the acquisition description stub if not found, and then read it
453
            if not self.file_experiment_description.exists():
2✔
UNCOV
454
                raise FileNotFoundError(self.file_experiment_description)
×
455
            acquisition_description = session_params.read_params(self.file_experiment_description)
2✔
456
        self._experiment_description = acquisition_description
2✔
457
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
458

459

460
class BehaviorCopier(SessionCopier):
2✔
461
    tag = 'behavior'
2✔
462
    assert_connect_on_init = False
2✔
463

464
    @property
2✔
465
    def experiment_description(self):
2✔
466
        return session_params.read_params(self.session_path)
2✔
467

468
    def _copy_collections(self):
2✔
469
        """Patch settings files before copy.
470

471
        Before copying the collections, this method checks that the behaviour data are valid. The
472
        following checks are made:
473

474
        #. Check at least 1 task collection in experiment description. If not, return.
475
        #. For each collection, check for task settings. If any are missing, return.
476
        #. If SESSION_END_TIME is missing, assumes task crashed. If so and task data missing and
477
           not a chained protocol (i.e. it is the only task collection), assume a dud and remove
478
           the remote stub file.  Otherwise, patch settings with total trials, end time, etc.
479

480
        Returns
481
        -------
482
        bool
483
            True if transfer successfully completed.
484

485
        """
486
        collections = session_params.get_task_collection(self.experiment_description)
2✔
487
        if not collections:
2✔
UNCOV
488
            log.error(f'Skipping: no task collections defined for {self.session_path}')
×
UNCOV
489
            return False
×
490
        for collection in (collections := ensure_list(collections)):
2✔
491
            task_settings = raw_data_loaders.load_settings(self.session_path, task_collection=collection)
2✔
492
            if task_settings is None:
2✔
UNCOV
493
                log.info(f'Skipping: no task settings found for {self.session_path}')
×
UNCOV
494
                return False  # may also want to remove session here if empty
×
495
            # here if the session end time has not been labeled we assume that the session crashed, and patch the settings
496
            if task_settings['SESSION_END_TIME'] is None:
2✔
497
                jsonable = self.session_path.joinpath(collection, '_iblrig_taskData.raw.jsonable')
2✔
498
                if not jsonable.exists():
2✔
499
                    log.info(f'Skipping: no task data found for {self.session_path}')
2✔
500
                    # No local data and only behaviour stub in remote; assume dud and remove entire session
501
                    if (
2✔
502
                        self.remote_session_path.exists()
503
                        and len(collections) == 1
504
                        and len(list(self.file_remote_experiment_description.parent.glob('*.yaml'))) <= 1
505
                    ):
506
                        shutil.rmtree(self.remote_session_path)  # remove likely dud
2✔
507
                    return False
2✔
508
                trials, bpod_data = load_task_jsonable(jsonable)
2✔
509
                ntrials = trials.shape[0]
2✔
510
                # We have the case where the session hard crashed.
511
                # Patch the settings file to wrap the session and continue the copying.
512
                log.warning(f'Recovering crashed session {self.session_path}')
2✔
513
                settings_file = self.session_path.joinpath(collection, '_iblrig_taskSettings.raw.json')
2✔
514
                with open(settings_file) as fid:
2✔
515
                    raw_settings = json.load(fid)
2✔
516
                raw_settings['NTRIALS'] = int(ntrials)
2✔
517
                raw_settings['NTRIALS_CORRECT'] = int(trials['trial_correct'].sum())
2✔
518
                raw_settings['TOTAL_WATER_DELIVERED'] = int(trials['reward_amount'].sum())
2✔
519
                # cast the timestamp in a datetime object and add the session length to it
520
                end_time = datetime.datetime.strptime(raw_settings['SESSION_START_TIME'], '%Y-%m-%dT%H:%M:%S.%f')
2✔
521
                end_time += datetime.timedelta(seconds=bpod_data[-1]['Trial end timestamp'])
2✔
522
                raw_settings['SESSION_END_TIME'] = end_time.strftime('%Y-%m-%dT%H:%M:%S.%f')
2✔
523
                with open(settings_file, 'w') as fid:
2✔
524
                    json.dump(raw_settings, fid)
2✔
525
        log.critical(f'{self.state}, {self.session_path}')
2✔
526
        return super()._copy_collections()  # proceed with copy
2✔
527

528
    def finalize_copy(self, number_of_expected_devices=None):
2✔
529
        """If main sync is bpod, expect a single stub file."""
530
        if number_of_expected_devices is None and session_params.get_sync(self.remote_experiment_description_stub) == 'bpod':
2✔
UNCOV
531
            number_of_expected_devices = 1
×
532
        super().finalize_copy(number_of_expected_devices=number_of_expected_devices)
2✔
533

534

535
class EphysCopier(SessionCopier):
2✔
536
    tag = 'ephys'
2✔
537
    assert_connect_on_init = True
2✔
538

539
    def initialize_experiment(self, acquisition_description=None, nprobes=None, main_sync=True, **kwargs):
2✔
540
        if not acquisition_description:
2✔
541
            acquisition_description = {'devices': {'neuropixel': {}}}
2✔
542
            neuropixel = acquisition_description['devices']['neuropixel']
2✔
543
            if nprobes is None:
2✔
544
                nprobes = len(list(self.session_path.glob('**/*.ap.bin')))
2✔
545
            for n in range(nprobes):
2✔
546
                name = f'probe{n:02}'
2✔
547
                neuropixel[name] = {'collection': f'raw_ephys_data/{name}', 'sync_label': 'imec_sync'}
2✔
548
            sync_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'sync', 'nidq.yaml')
2✔
549
            acquisition_description = acquisition_description if neuropixel else {}
2✔
550
            if main_sync:
2✔
551
                acquisition_description.update(session_params.read_params(sync_file))
2✔
552

553
        self._experiment_description = acquisition_description
2✔
554
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
555
        # once the session folders have been initialized, create the probe folders
556
        for n in range(nprobes):
2✔
557
            self.session_path.joinpath('raw_ephys_data', f'probe{n:02}').mkdir(exist_ok=True, parents=True)
2✔
558

559
    def _copy_collections(self):
2✔
560
        """Here we overload the copy to be able to rename the probes properly and also create the insertions."""
561
        log.info(f'Transferring ephys session: {self.session_path} to {self.remote_session_path}')
2✔
562
        ibllib.pipes.misc.rename_ephys_files(self.session_path)
2✔
563
        ibllib.pipes.misc.move_ephys_files(self.session_path)
2✔
564
        # copy the wiring files from template
565
        path_wiring = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'neuropixel', 'wirings')
2✔
566
        probe_model = '3A'
2✔
567
        for file_nidq_bin in self.session_path.joinpath('raw_ephys_data').glob('*.nidq.bin'):
2✔
568
            probe_model = '3B'
2✔
569
            shutil.copy(path_wiring.joinpath('nidq.wiring.json'), file_nidq_bin.with_suffix('.wiring.json'))
2✔
570
        for file_ap_bin in self.session_path.joinpath('raw_ephys_data').rglob('*.ap.bin'):
2✔
571
            shutil.copy(path_wiring.joinpath(f'{probe_model}.wiring.json'), file_ap_bin.with_suffix('.wiring.json'))
2✔
572
        try:
2✔
573
            ibllib.pipes.misc.create_alyx_probe_insertions(self.session_path)
2✔
574
        except Exception:
2✔
575
            log.error(traceback.print_exc())
2✔
576
            log.info('Probe creation failed, please create the probe insertions manually. Continuing transfer...')
2✔
577
        return copy_folders(
2✔
578
            local_folder=self.session_path.joinpath('raw_ephys_data'),
579
            remote_folder=self.remote_session_path.joinpath('raw_ephys_data'),
580
            overwrite=True,
581
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc