• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 9032957364

10 May 2024 01:25PM UTC coverage: 48.538% (+1.7%) from 46.79%
9032957364

Pull #643

github

74d2ec
web-flow
Merge aebf2c9af into ec2d8e4fe
Pull Request #643: 8.19.0

377 of 1074 new or added lines in 38 files covered. (35.1%)

977 existing lines in 19 files now uncovered.

3253 of 6702 relevant lines covered (48.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/iblrig/transfer_experiments.py
1
import datetime
2✔
2
import json
2✔
3
import logging
2✔
4
import os
2✔
5
import shutil
2✔
6
import socket
2✔
7
import traceback
2✔
8
import uuid
2✔
9
from os.path import samestat
2✔
10
from pathlib import Path
2✔
11

12
import ibllib.pipes.misc
2✔
13
import iblrig
2✔
14
import one.alf.files as alfiles
2✔
15
from ibllib.io import raw_data_loaders, session_params
2✔
16
from ibllib.pipes.misc import sleepless
2✔
17
from iblrig.raw_data_loaders import load_task_jsonable
2✔
18
from iblutil.io import hashfile
2✔
19
from one.util import ensure_list
2✔
20

21
log = logging.getLogger(__name__)
2✔
22

23
ES_CONTINUOUS = 0x80000000
2✔
24
ES_SYSTEM_REQUIRED = 0x00000001
2✔
25

26

27
@sleepless
2✔
28
def _copy2_checksum(src: str, dst: str, *args, **kwargs) -> str:
2✔
29
    """
30
    Copy a file from source to destination with checksum verification.
31

32
    This function copies a file from the source path to the destination path
33
    while verifying the BLAKE2B hash of the source and destination files. If the
34
    BLAKE2B hashes do not match after copying, an OSError is raised.
35

36
    Parameters
37
    ----------
38
    src : str
39
        The path to the source file.
40
    dst : str
41
        The path to the destination file.
42
    *args, **kwargs
43
        Additional arguments and keyword arguments to pass to `shutil.copy2`.
44

45
    Returns
46
    -------
47
    str
48
        The path to the copied file.
49

50
    Raises
51
    ------
52
    OSError
53
        If the BLAKE2B hashes of the source and destination files do not match.
54
    """
55
    log.info(f'Processing `{src}`:')
2✔
56
    log.info('  - calculating hash of local file')
2✔
57
    src_md5 = hashfile.blake2b(src, False)
2✔
58
    if os.path.exists(dst) and samestat(os.stat(src), os.stat(dst)):
2✔
UNCOV
59
        log.info('  - file already exists at destination')
×
UNCOV
60
        log.info('  - calculating hash of remote file')
×
UNCOV
61
        if src_md5 == hashfile.blake2b(dst, False):
×
UNCOV
62
            log.info('  - local and remote BLAKE2B hashes match, skipping copy')
×
UNCOV
63
            return dst
×
64
        else:
UNCOV
65
            log.info('  - local and remote hashes DO NOT match')
×
66
    log.info(f'  - copying file to `{dst}`')
2✔
67
    return_val = shutil.copy2(src, dst, *args, **kwargs)
2✔
68
    log.info('  - calculating hash of remote file')
2✔
69
    if not src_md5 == hashfile.blake2b(dst, False):
2✔
UNCOV
70
        raise OSError(f'Error copying {src}: hash mismatch.')
×
71
    log.info('  - local and remote hashes match, copy successful')
2✔
72
    return return_val
2✔
73

74

75
def copy_folders(local_folder: Path, remote_folder: Path, overwrite: bool = False) -> bool:
2✔
76
    """
77
    Copy folders and files from a local location to a remote location.
78

79
    This function copies all folders and files from a local directory to a
80
    remote directory. It provides options to overwrite existing files in
81
    the remote directory and ignore specific file patterns.
82

83
    Parameters
84
    ----------
85
    local_folder : Path
86
        The path to the local folder to copy from.
87
    remote_folder : Path
88
        The path to the remote folder to copy to.
89
    overwrite : bool, optional
90
        If True, overwrite existing files in the remote folder. Default is False.
91

92
    Returns
93
    -------
94
    bool
95
        True if the copying is successful, False otherwise.
96
    """
97
    status = True
2✔
98
    try:
2✔
99
        remote_folder.parent.mkdir(parents=True, exist_ok=True)
2✔
100
        shutil.copytree(
2✔
101
            local_folder,
102
            remote_folder,
103
            dirs_exist_ok=overwrite,
104
            ignore=shutil.ignore_patterns('transfer_me.flag'),
105
            copy_function=_copy2_checksum,
106
        )
107
    except OSError:
×
108
        log.error(traceback.format_exc())
×
UNCOV
109
        log.info(f'Could not copy {local_folder} to {remote_folder}')
×
110
        status = False
×
111
    return status
2✔
112

113

114
class SessionCopier:
2✔
115
    """Initialize and copy session data to a remote server."""
116

117
    assert_connect_on_init = True
2✔
118
    """bool: Raise error if unable to write stub file to remote server."""
2✔
119

120
    _experiment_description = None
2✔
121
    """dict: The experiment description file used for the copy."""
2✔
122

123
    tag = f'{socket.gethostname()}_{uuid.getnode()}'
2✔
124
    """str: The device name (adds this to the experiment description stub file on the remote server)."""
2✔
125

126
    def __init__(self, session_path, remote_subjects_folder=None, tag=None):
2✔
127
        """
128
        Initialize and copy session data to a remote server.
129

130
        Parameters
131
        ----------
132
        session_path : str, pathlib.Path
133
            The partial or session path to copy.
134
        remote_subjects_folder : str, pathlib.Path
135
            The remote server path to which to copy the session data.
136
        tag : str
137
            The device name (adds this to the experiment description stub file on the remote server).
138
        """
139
        self.tag = tag or self.tag
2✔
140
        self.session_path = Path(session_path)
2✔
141
        self.remote_subjects_folder = Path(remote_subjects_folder) if remote_subjects_folder else None
2✔
142

143
    def __repr__(self):
2✔
UNCOV
144
        return f'{super().__repr__()} \n local: {self.session_path} \n remote: {self.remote_session_path}'
×
145

146
    @property
2✔
147
    def state(self):
2✔
148
        return self.get_state()[0]
2✔
149

150
    def run(self, number_of_expected_devices=None):
2✔
151
        """
152
        Runs the copy of this device experiment. It will try to get as far as possible in the copy
153
        process (from states 0 init experiment to state 3 finalize experiment) if possible, and
154
        return earlier if the process can't be completed.
155
        :return:
156
        """
157
        if self.state == -1:  # this case is not implemented automatically and corresponds to a hard reset
2✔
UNCOV
158
            log.info(f'{self.state}, {self.session_path}')
×
UNCOV
159
            shutil.rmtree(self.remote_session_path)
×
UNCOV
160
            self.initialize_experiment()
×
161
        if self.state == 0:  # the session hasn't even been initialized: copy the stub to the remote
2✔
UNCOV
162
            log.info(f'{self.state}, {self.session_path}')
×
UNCOV
163
            self.initialize_experiment()
×
164
        if self.state == 1:  # the session is ready for copy
2✔
165
            log.info(f'{self.state}, {self.session_path}')
2✔
166
            self.copy_collections()
2✔
167
        if self.state == 2:
2✔
168
            log.info(f'{self.state}, {self.session_path}')
2✔
169
            self.finalize_copy(number_of_expected_devices=number_of_expected_devices)
2✔
170
        if self.state == 3:
2✔
171
            log.info(f'{self.state}, {self.session_path}')
2✔
172

173
    def get_state(self):
2✔
174
        """
175
        Gets the current copier state.
176
        State 0: this device experiment has not been initialized for this device
177
        State 1: this device experiment is initialized (the experiment description stub is present on the remote)
178
        State 2: this device experiment is copied on the remote server, but other devices copies are still pending
179
        State 3: the whole experiment is finalized and all data is on the server
180
        :return:
181
        """
182
        if self.remote_subjects_folder is None or not self.remote_subjects_folder.exists():
2✔
183
            return None, f'Remote subjects folder {self.remote_subjects_folder} set to Null or unreachable'
×
184
        if not self.file_remote_experiment_description.exists():
2✔
185
            return 0, f'Copy object not registered on server: {self.file_remote_experiment_description} does not exist'
2✔
186
        status_file = self.glob_file_remote_copy_status()
2✔
187
        if status_file is None:
2✔
UNCOV
188
            status_file = self.file_remote_experiment_description.with_suffix('.status_pending')
×
UNCOV
189
            status_file.touch()
×
UNCOV
190
            log.warning(f'{status_file} not found and created')
×
191
        if status_file.name.endswith('pending'):
2✔
192
            return 1, f'Copy pending {self.file_remote_experiment_description}'
2✔
193
        elif status_file.name.endswith('complete'):
2✔
194
            return 2, f'Copy complete {self.file_remote_experiment_description}'
2✔
195
        elif status_file.name.endswith('final'):
2✔
196
            return 3, f'Copy finalized {self.file_remote_experiment_description}'
2✔
197

198
    @property
2✔
199
    def experiment_description(self):
2✔
200
        return self._experiment_description
2✔
201

202
    @property
2✔
203
    def remote_session_path(self):
2✔
204
        if self.remote_subjects_folder:
2✔
205
            # padded_sequence ensures session path has zero padded number folder, e.g. 1 -> 001
206
            session_parts = alfiles.padded_sequence(self.session_path).parts[-3:]
2✔
207
            return self.remote_subjects_folder.joinpath(*session_parts)
2✔
208

209
    @property
2✔
210
    def file_experiment_description(self):
2✔
211
        """Returns the local experiment description file, if none found, returns one with the tag."""
212
        return next(
2✔
213
            self.session_path.glob('_ibl_experiment.description*'),
214
            self.session_path.joinpath(f'_ibl_experiment.description_{self.tag}.yaml'),
215
        )
216

217
    def glob_file_remote_copy_status(self, status='*'):
2✔
218
        """status: pending / complete"""
219
        fr = self.file_remote_experiment_description
2✔
220
        return next(fr.parent.glob(f'{fr.stem}.status_{status}'), None) if fr else None
2✔
221

222
    @property
2✔
223
    def file_remote_experiment_description(self):
2✔
224
        """Return the remote path to the remote stub file."""
225
        if self.remote_subjects_folder:
2✔
226
            return session_params.get_remote_stub_name(self.remote_session_path, device_id=self.tag)
2✔
227

228
    @property
2✔
229
    def remote_experiment_description_stub(self):
2✔
230
        return session_params.read_params(self.file_remote_experiment_description)
2✔
231

232
    def _copy_collections(self):
2✔
233
        """
234
        Copy collections defined in experiment description file.
235

236
        This is the method to subclass for pre- and post- copy routines.
237

238
        Returns
239
        -------
240
        bool
241
            True if transfer successfully completed.
242
        """
243
        status = True
2✔
244
        exp_pars = session_params.read_params(self.session_path)
2✔
245
        collections = set()
2✔
246
        # First glob on each collection pattern to find all folders to transfer
247
        for collection in session_params.get_collections(exp_pars, flat=True):
2✔
248
            folders = filter(Path.is_dir, self.session_path.glob(collection))
2✔
249
            _collections = list(map(lambda x: x.relative_to(self.session_path).as_posix(), folders))
2✔
250
            if not _collections:
2✔
251
                log.error(f'No collection(s) matching "{collection}" found')
×
UNCOV
252
                status = False
×
UNCOV
253
                continue
×
254
            collections.update(_collections)
2✔
255

256
        # Attempt to copy each folder
257
        for collection in collections:
2✔
258
            local_collection = self.session_path.joinpath(collection)
2✔
259
            assert local_collection.exists(), f'local collection "{collection}" no longer exists'
2✔
260
            log.info(f'transferring {self.session_path} - {collection}')
2✔
261
            remote_collection = self.remote_session_path.joinpath(collection)
2✔
262
            if remote_collection.exists():
2✔
263
                # this is far from ideal: we currently recopy all files even if some already copied
264
                log.warning(f'Collection {remote_collection} already exists, removing')
×
265
                shutil.rmtree(remote_collection)
×
266
            status &= copy_folders(local_collection, remote_collection)
2✔
267
        return status
2✔
268

269
    def copy_collections(self):
2✔
270
        """
271
        Recursively copies the collection folders into the remote session path.
272

273
        Do not overload, overload _copy_collections instead.
274
        """
275
        if self.glob_file_remote_copy_status('complete'):
2✔
276
            log.warning(
2✔
277
                f'Copy already complete for {self.session_path},'
278
                f' remove {self.glob_file_remote_copy_status("complete")} to force'
279
            )
280
            return True
2✔
281
        status = self._copy_collections()
2✔
282
        # post copy stuff: rename the pending flag to complete
283
        if status:
2✔
284
            pending_file = self.glob_file_remote_copy_status('pending')
2✔
285
            pending_file.rename(pending_file.with_suffix('.status_complete'))
2✔
286
            if self.session_path.joinpath('transfer_me.flag').exists():
2✔
287
                self.session_path.joinpath('transfer_me.flag').unlink()
2✔
288
        return status
2✔
289

290
    def initialize_experiment(self, acquisition_description=None, overwrite=True):
2✔
291
        """
292
        Copy acquisition description yaml to the server and local transfers folder.
293

294
        Parameters
295
        ----------
296
        acquisition_description : dict
297
            The data to write to the experiment.description.yaml file.
298
        overwrite : bool
299
            If true, overwrite any existing file with the new one, otherwise, update the existing file.
300
        """
301
        if acquisition_description is None:
2✔
UNCOV
302
            acquisition_description = self.experiment_description
×
303

304
        assert acquisition_description
2✔
305

306
        # First attempt to add the remote description stub to the _device folder on the remote session
307
        if not self.remote_subjects_folder:
2✔
308
            log.info('The remote path is unspecified and remote experiment.description stub creation is omitted.')
2✔
309
        else:
310
            remote_stub_file = self.file_remote_experiment_description
2✔
311
            previous_description = (
2✔
312
                session_params.read_params(remote_stub_file) if remote_stub_file.exists() and not overwrite else {}
313
            )
314
            try:
2✔
315
                merged_description = session_params.merge_params(previous_description, acquisition_description)
2✔
316
                session_params.write_yaml(remote_stub_file, merged_description)
2✔
317
                for f in remote_stub_file.parent.glob(remote_stub_file.stem + '.status_*'):
2✔
UNCOV
318
                    f.unlink()
×
319
                remote_stub_file.with_suffix('.status_pending').touch()
2✔
320
                log.info(f'Written data to remote device at: {remote_stub_file}.')
2✔
321
            except Exception as e:
1✔
322
                if self.assert_connect_on_init:
1✔
323
                    raise RuntimeError(f'Failed to write data to remote device at: {remote_stub_file}. \n {e}') from e
×
324
                log.warning(f'Failed to write data to remote device at: {remote_stub_file}. \n {e}')
1✔
325

326
        # then create on the local machine
327
        previous_description = (
2✔
328
            session_params.read_params(self.file_experiment_description)
329
            if self.file_experiment_description.exists() and not overwrite
330
            else {}
331
        )
332
        session_params.write_yaml(
2✔
333
            self.file_experiment_description, session_params.merge_params(previous_description, acquisition_description)
334
        )
335
        log.info(f'Written data to local session at : {self.file_experiment_description}.')
2✔
336

337
    def finalize_copy(self, number_of_expected_devices=None):
2✔
338
        """At the end of the copy, check if all the files are there and if so, aggregate the device files."""
339
        ready_to_finalize = 0
2✔
340
        # List the stub files in _devices folder
341
        files_stub = list(self.file_remote_experiment_description.parent.glob('*.yaml'))
2✔
342
        if number_of_expected_devices is None:
2✔
343
            number_of_expected_devices = len(files_stub)
2✔
344
        log.debug(f'Number of expected devices is {number_of_expected_devices}')
2✔
345

346
        for file_stub in files_stub:
2✔
347
            ready_to_finalize += int(file_stub.with_suffix('.status_complete').exists())
2✔
348
            ad_stub = session_params.read_params(file_stub)
2✔
349
            # here we check the sync field of the device files
350
            if next(iter(ad_stub.get('sync', {})), None) != 'bpod' and number_of_expected_devices == 1:
2✔
351
                log.warning(
2✔
352
                    'Only bpod is supported for single device sessions, it seems you are '
353
                    'attempting to transfer a session with more than one device.'
354
                )
355
                return
2✔
356

357
        if ready_to_finalize > number_of_expected_devices:
2✔
UNCOV
358
            log.error('More stub files (%i) than expected devices (%i)', ready_to_finalize, number_of_expected_devices)
×
UNCOV
359
            return
×
360
        log.info(f'{ready_to_finalize}/{number_of_expected_devices} copy completion status')
2✔
361
        if ready_to_finalize == number_of_expected_devices:
2✔
362
            for file_stub in files_stub:
2✔
363
                session_params.aggregate_device(file_stub, self.remote_session_path.joinpath('_ibl_experiment.description.yaml'))
2✔
364
                file_stub.with_suffix('.status_complete').rename(file_stub.with_suffix('.status_final'))
2✔
365
            self.remote_session_path.joinpath('raw_session.flag').touch()
2✔
366

367

368
class VideoCopier(SessionCopier):
2✔
369
    tag = 'video'
2✔
370
    assert_connect_on_init = True
2✔
371

372
    def create_video_stub(self, config, collection='raw_video_data'):
2✔
373
        acquisition_description = self.config2stub(config, collection)
2✔
374
        session_params.write_params(self.session_path, acquisition_description)
2✔
375

376
    @staticmethod
2✔
377
    def config2stub(config: dict, collection: str = 'raw_video_data') -> dict:
2✔
378
        """
379
        Generate acquisition description stub from a camera config dict.
380

381
        Parameters
382
        ----------
383
        config : dict
384
            A cameras configuration dictionary, found in `device_cameras` of hardware_settings.yaml.
385
        collection : str
386
            The video output collection.
387

388
        Returns
389
        -------
390
        dict
391
            An acquisition description file stub.
392
        """
393
        cameras = {}
2✔
394
        for label, settings in filter(lambda itms: itms[0] != 'BONSAI_WORKFLOW', config.items()):
2✔
395
            settings_mod = {k.lower(): v for k, v in settings.items() if v is not None and k != 'INDEX'}
2✔
396
            cameras[label] = dict(collection=collection, **settings_mod)
2✔
397
        acq_desc = {'devices': {'cameras': cameras}, 'version': '1.0.0'}
2✔
398
        return acq_desc
2✔
399

400
    def initialize_experiment(self, acquisition_description=None, **kwargs):
2✔
401
        if not acquisition_description:
2✔
402
            # creates the acquisition description stub if not found, and then read it
403
            if not self.file_experiment_description.exists():
2✔
UNCOV
404
                raise FileNotFoundError(self.file_experiment_description)
×
405
            acquisition_description = session_params.read_params(self.file_experiment_description)
2✔
406
        self._experiment_description = acquisition_description
2✔
407
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
408

409

410
class BehaviorCopier(SessionCopier):
2✔
411
    tag = 'behavior'
2✔
412
    assert_connect_on_init = False
2✔
413

414
    @property
2✔
415
    def experiment_description(self):
2✔
416
        return session_params.read_params(self.session_path)
2✔
417

418
    def _copy_collections(self):
2✔
419
        """Patch settings files before copy.
420

421
        Before copying the collections, this method checks that the behaviour data are valid. The
422
        following checks are made:
423

424
        #. Check at least 1 task collection in experiment description. If not, return.
425
        #. For each collection, check for task settings. If any are missing, return.
426
        #. If SESSION_END_TIME is missing, assumes task crashed. If so and task data missing and
427
           not a chained protocol (i.e. it is the only task collection), assume a dud and remove
428
           the remote stub file.  Otherwise, patch settings with total trials, end time, etc.
429

430
        Returns
431
        -------
432
        bool
433
            True if transfer successfully completed.
434

435
        """
436
        collections = session_params.get_task_collection(self.experiment_description)
2✔
437
        if not collections:
2✔
UNCOV
438
            log.error(f'Skipping: no task collections defined for {self.session_path}')
×
UNCOV
439
            return False
×
440
        for collection in (collections := ensure_list(collections)):
2✔
441
            task_settings = raw_data_loaders.load_settings(self.session_path, task_collection=collection)
2✔
442
            if task_settings is None:
2✔
UNCOV
443
                log.info(f'Skipping: no task settings found for {self.session_path}')
×
UNCOV
444
                return False  # may also want to remove session here if empty
×
445
            # here if the session end time has not been labeled we assume that the session crashed, and patch the settings
446
            if task_settings['SESSION_END_TIME'] is None:
2✔
447
                jsonable = self.session_path.joinpath(collection, '_iblrig_taskData.raw.jsonable')
2✔
448
                if not jsonable.exists():
2✔
449
                    log.info(f'Skipping: no task data found for {self.session_path}')
2✔
450
                    # No local data and only behaviour stub in remote; assume dud and remove entire session
451
                    if (
2✔
452
                        self.remote_session_path.exists()
453
                        and len(collections) == 1
454
                        and len(list(self.file_remote_experiment_description.parent.glob('*.yaml'))) <= 1
455
                    ):
456
                        shutil.rmtree(self.remote_session_path)  # remove likely dud
2✔
457
                    return False
2✔
458
                trials, bpod_data = load_task_jsonable(jsonable)
2✔
459
                ntrials = trials.shape[0]
2✔
460
                # We have the case where the session hard crashed.
461
                # Patch the settings file to wrap the session and continue the copying.
462
                log.warning(f'Recovering crashed session {self.session_path}')
2✔
463
                settings_file = self.session_path.joinpath(collection, '_iblrig_taskSettings.raw.json')
2✔
464
                with open(settings_file) as fid:
2✔
465
                    raw_settings = json.load(fid)
2✔
466
                raw_settings['NTRIALS'] = int(ntrials)
2✔
467
                raw_settings['NTRIALS_CORRECT'] = int(trials['trial_correct'].sum())
2✔
468
                raw_settings['TOTAL_WATER_DELIVERED'] = int(trials['reward_amount'].sum())
2✔
469
                # cast the timestamp in a datetime object and add the session length to it
470
                end_time = datetime.datetime.strptime(raw_settings['SESSION_START_TIME'], '%Y-%m-%dT%H:%M:%S.%f')
2✔
471
                end_time += datetime.timedelta(seconds=bpod_data[-1]['Trial end timestamp'])
2✔
472
                raw_settings['SESSION_END_TIME'] = end_time.strftime('%Y-%m-%dT%H:%M:%S.%f')
2✔
473
                with open(settings_file, 'w') as fid:
2✔
474
                    json.dump(raw_settings, fid)
2✔
475
        log.critical(f'{self.state}, {self.session_path}')
2✔
476
        return super()._copy_collections()  # proceed with copy
2✔
477

478
    def finalize_copy(self, number_of_expected_devices=None):
2✔
479
        """If main sync is bpod, expect a single stub file."""
480
        if number_of_expected_devices is None and session_params.get_sync(self.remote_experiment_description_stub) == 'bpod':
2✔
UNCOV
481
            number_of_expected_devices = 1
×
482
        super().finalize_copy(number_of_expected_devices=number_of_expected_devices)
2✔
483

484

485
class EphysCopier(SessionCopier):
2✔
486
    tag = 'ephys'
2✔
487
    assert_connect_on_init = True
2✔
488

489
    def initialize_experiment(self, acquisition_description=None, nprobes=None, main_sync=True, **kwargs):
2✔
490
        if not acquisition_description:
2✔
491
            acquisition_description = {'devices': {'neuropixel': {}}}
2✔
492
            neuropixel = acquisition_description['devices']['neuropixel']
2✔
493
            if nprobes is None:
2✔
494
                nprobes = len(list(self.session_path.glob('**/*.ap.bin')))
2✔
495
            for n in range(nprobes):
2✔
496
                name = f'probe{n:02}'
2✔
497
                neuropixel[name] = {'collection': f'raw_ephys_data/{name}', 'sync_label': 'imec_sync'}
2✔
498
            sync_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'sync', 'nidq.yaml')
2✔
499
            acquisition_description = acquisition_description if neuropixel else {}
2✔
500
            if main_sync:
2✔
501
                acquisition_description.update(session_params.read_params(sync_file))
2✔
502

503
        self._experiment_description = acquisition_description
2✔
504
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
505
        # once the session folders have been initialized, create the probe folders
506
        for n in range(nprobes):
2✔
507
            self.session_path.joinpath('raw_ephys_data', f'probe{n:02}').mkdir(exist_ok=True, parents=True)
2✔
508

509
    def _copy_collections(self):
2✔
510
        """Here we overload the copy to be able to rename the probes properly and also create the insertions."""
511
        log.info(f'Transferring ephys session: {self.session_path} to {self.remote_session_path}')
2✔
512
        ibllib.pipes.misc.rename_ephys_files(self.session_path)
2✔
513
        ibllib.pipes.misc.move_ephys_files(self.session_path)
2✔
514
        # copy the wiring files from template
515
        path_wiring = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'neuropixel', 'wirings')
2✔
516
        probe_model = '3A'
2✔
517
        for file_nidq_bin in self.session_path.joinpath('raw_ephys_data').glob('*.nidq.bin'):
2✔
518
            probe_model = '3B'
2✔
519
            shutil.copy(path_wiring.joinpath('nidq.wiring.json'), file_nidq_bin.with_suffix('.wiring.json'))
2✔
520
        for file_ap_bin in self.session_path.joinpath('raw_ephys_data').rglob('*.ap.bin'):
2✔
521
            shutil.copy(path_wiring.joinpath(f'{probe_model}.wiring.json'), file_ap_bin.with_suffix('.wiring.json'))
2✔
522
        try:
2✔
523
            ibllib.pipes.misc.create_alyx_probe_insertions(self.session_path)
2✔
524
        except Exception:
2✔
525
            log.error(traceback.print_exc())
2✔
526
            log.info('Probe creation failed, please create the probe insertions manually. Continuing transfer...')
2✔
527
        return copy_folders(
2✔
528
            local_folder=self.session_path.joinpath('raw_ephys_data'),
529
            remote_folder=self.remote_session_path.joinpath('raw_ephys_data'),
530
            overwrite=True,
531
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc