• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 12279337432

11 Dec 2024 03:15PM UTC coverage: 47.031% (+0.2%) from 46.79%
12279337432

Pull #751

github

d4edef
web-flow
Merge eea51f2f7 into 2f9d65d86
Pull Request #751: Fiber trajectory GUI

0 of 114 new or added lines in 1 file covered. (0.0%)

1076 existing lines in 22 files now uncovered.

4246 of 9028 relevant lines covered (47.03%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.76
/iblrig/transfer_experiments.py
1
import datetime
2✔
2
import json
2✔
3
import logging
2✔
4
import os
2✔
5
import shutil
2✔
6
import socket
2✔
7
import traceback
2✔
8
import uuid
2✔
9
from collections.abc import Iterable
2✔
10
from enum import IntEnum
2✔
11
from os.path import samestat
2✔
12
from pathlib import Path
2✔
13

14
import numpy as np
2✔
15
import pandas as pd
2✔
16
import pandera
2✔
17

18
import ibllib.pipes.misc
2✔
19
import iblrig
2✔
20
import one.alf.path as alfiles
2✔
21
from ibllib.io import raw_data_loaders, session_params
2✔
22
from ibllib.pipes.misc import sleepless
2✔
23
from iblrig.raw_data_loaders import load_task_jsonable
2✔
24
from iblutil.io import hashfile
2✔
25
from iblutil.util import ensure_list
2✔
26

27
log = logging.getLogger(__name__)
2✔
28

29
ES_CONTINUOUS = 0x80000000
2✔
30
ES_SYSTEM_REQUIRED = 0x00000001
2✔
31

32

33
class CopyState(IntEnum):
2✔
34
    HARD_RESET = -1
2✔
35
    NOT_REGISTERED = 0
2✔
36
    PENDING = 1
2✔
37
    COMPLETE = 2
2✔
38
    FINALIZED = 3
2✔
39

40

41
@sleepless
2✔
42
def _copy2_checksum(src: str, dst: str, *args, **kwargs) -> str:
2✔
43
    """
44
    Copy a file from source to destination with checksum verification.
45

46
    This function copies a file from the source path to the destination path
47
    while verifying the BLAKE2B hash of the source and destination files. If the
48
    BLAKE2B hashes do not match after copying, an OSError is raised.
49

50
    Parameters
51
    ----------
52
    src : str
53
        The path to the source file.
54
    dst : str
55
        The path to the destination file.
56
    *args, **kwargs
57
        Additional arguments and keyword arguments to pass to `shutil.copy2`.
58

59
    Returns
60
    -------
61
    str
62
        The path to the copied file.
63

64
    Raises
65
    ------
66
    OSError
67
        If the BLAKE2B hashes of the source and destination files do not match.
68
    """
69
    log.info(f'Processing `{src}`:')
2✔
70
    log.info('  - calculating hash of local file')
2✔
71
    src_md5 = hashfile.blake2b(src, False)
2✔
72
    if os.path.exists(dst) and samestat(os.stat(src), os.stat(dst)):
2✔
UNCOV
73
        log.info('  - file already exists at destination')
×
UNCOV
74
        log.info('  - calculating hash of remote file')
×
UNCOV
75
        if src_md5 == hashfile.blake2b(dst, False):
×
UNCOV
76
            log.info('  - local and remote BLAKE2B hashes match, skipping copy')
×
UNCOV
77
            return dst
×
78
        else:
UNCOV
79
            log.info('  - local and remote hashes DO NOT match')
×
80
    log.info(f'  - copying file to `{dst}`')
2✔
81
    return_val = shutil.copy2(src, dst, *args, **kwargs)
2✔
82
    log.info('  - calculating hash of remote file')
2✔
83
    if not src_md5 == hashfile.blake2b(dst, False):
2✔
UNCOV
84
        raise OSError(f'Error copying {src}: hash mismatch.')
×
85
    log.info('  - local and remote hashes match, copy successful')
2✔
86
    return return_val
2✔
87

88

89
def copy_folders(local_folder: Path, remote_folder: Path, overwrite: bool = False) -> bool:
2✔
90
    """
91
    Copy folders and files from a local location to a remote location.
92

93
    This function copies all folders and files from a local directory to a
94
    remote directory. It provides options to overwrite existing files in
95
    the remote directory and ignore specific file patterns.
96

97
    Parameters
98
    ----------
99
    local_folder : Path
100
        The path to the local folder to copy from.
101
    remote_folder : Path
102
        The path to the remote folder to copy to.
103
    overwrite : bool, optional
104
        If True, overwrite existing files in the remote folder. Default is False.
105

106
    Returns
107
    -------
108
    bool
109
        True if the copying is successful, False otherwise.
110
    """
111
    status = True
2✔
112
    try:
2✔
113
        remote_folder.parent.mkdir(parents=True, exist_ok=True)
2✔
114
        shutil.copytree(
2✔
115
            local_folder,
116
            remote_folder,
117
            dirs_exist_ok=overwrite,
118
            ignore=shutil.ignore_patterns('transfer_me.flag'),
119
            copy_function=_copy2_checksum,
120
        )
UNCOV
121
    except OSError:
×
UNCOV
122
        log.error(traceback.format_exc())
×
UNCOV
123
        log.info(f'Could not copy {local_folder} to {remote_folder}')
×
UNCOV
124
        status = False
×
125
    return status
2✔
126

127

128
class SessionCopier:
2✔
129
    """Initialize and copy session data to a remote server."""
130

131
    assert_connect_on_init = True
2✔
132
    """bool: Raise error if unable to write stub file to remote server."""
2✔
133

134
    _experiment_description = None
2✔
135
    """dict: The experiment description file used for the copy."""
2✔
136

137
    tag = f'{socket.gethostname()}_{uuid.getnode()}'
2✔
138
    """str: The device name (adds this to the experiment description stub file on the remote server)."""
2✔
139

140
    def __init__(self, session_path, remote_subjects_folder=None, tag=None):
2✔
141
        """
142
        Initialize and copy session data to a remote server.
143

144
        Parameters
145
        ----------
146
        session_path : str, pathlib.Path
147
            The partial or session path to copy.
148
        remote_subjects_folder : str, pathlib.Path
149
            The remote server path to which to copy the session data.
150
        tag : str
151
            The device name (adds this to the experiment description stub file on the remote server).
152
        """
153
        self.tag = tag or self.tag
2✔
154
        self.session_path = Path(session_path)
2✔
155
        self.remote_subjects_folder = Path(remote_subjects_folder) if remote_subjects_folder else None
2✔
156

157
    def __repr__(self):
2✔
UNCOV
158
        return f'{super().__repr__()} \n local: {self.session_path} \n remote: {self.remote_session_path}'
×
159

160
    @property
2✔
161
    def state(self):
2✔
162
        return self.get_state()[0]
2✔
163

164
    def run(self, number_of_expected_devices=None):
2✔
165
        """
166
        Run the copy of this device experiment.
167

168
        Will try to get as far as possible in the copy process (from states 0 init experiment to state 3 finalize experiment)
169
        if possible, and return earlier if the process can't be completed.
170
        """
171
        if self.state == CopyState.HARD_RESET:  # this case is not implemented automatically and corresponds to a hard reset
2✔
UNCOV
172
            log.info(f'{self.state}, {self.session_path}')
×
UNCOV
173
            shutil.rmtree(self.remote_session_path)
×
UNCOV
174
            self.initialize_experiment()
×
175
        if self.state == CopyState.NOT_REGISTERED:  # the session hasn't even been initialized: copy the stub to the remote
2✔
UNCOV
176
            log.info(f'{self.state}, {self.session_path}')
×
UNCOV
177
            self.initialize_experiment()
×
178
        if self.state == CopyState.PENDING:  # the session is ready for copy
2✔
179
            log.info(f'{self.state}, {self.session_path}')
2✔
180
            self.copy_collections()
2✔
181
        if self.state == CopyState.COMPLETE:
2✔
182
            log.info(f'{self.state}, {self.session_path}')
2✔
183
            self.finalize_copy(number_of_expected_devices=number_of_expected_devices)
2✔
184
        if self.state == CopyState.FINALIZED:
2✔
185
            log.info(f'{self.state}, {self.session_path}')
2✔
186

187
    def get_state(self) -> tuple[CopyState | None, str]:
2✔
188
        """
189
        Gets the current copier state.
190

191
        State 0: this device experiment has not been initialized for this device
192
        State 1: this device experiment is initialized (the experiment description stub is present on the remote)
193
        State 2: this device experiment is copied on the remote server, but other devices copies are still pending
194
        State 3: the whole experiment is finalized and all data is on the server
195
        :return:
196
        """
197
        if self.remote_subjects_folder is None or not self.remote_subjects_folder.exists():
2✔
UNCOV
198
            return None, f'Remote subjects folder {self.remote_subjects_folder} set to Null or unreachable'
×
199
        if not self.file_remote_experiment_description.exists():
2✔
200
            return (
2✔
201
                CopyState.NOT_REGISTERED,
202
                f'Copy object not registered on server: {self.file_remote_experiment_description} does not exist',
203
            )
204
        status_file = self.glob_file_remote_copy_status()
2✔
205
        if status_file is None:
2✔
UNCOV
206
            status_file = self.file_remote_experiment_description.with_suffix('.status_pending')
×
UNCOV
207
            status_file.touch()
×
UNCOV
208
            log.warning(f'{status_file} not found and created')
×
209
        if status_file.name.endswith('pending'):
2✔
210
            return CopyState.PENDING, f'Copy pending {self.file_remote_experiment_description}'
2✔
211
        elif status_file.name.endswith('complete'):
2✔
212
            return CopyState.COMPLETE, f'Copy complete {self.file_remote_experiment_description}'
2✔
213
        elif status_file.name.endswith('final'):
2✔
214
            return CopyState.FINALIZED, f'Copy finalized {self.file_remote_experiment_description}'
2✔
215

216
    @property
2✔
217
    def experiment_description(self):
2✔
218
        return self._experiment_description
2✔
219

220
    @property
2✔
221
    def remote_session_path(self):
2✔
222
        if self.remote_subjects_folder:
2✔
223
            # padded_sequence ensures session path has zero padded number folder, e.g. 1 -> 001
224
            session_parts = alfiles.padded_sequence(self.session_path).parts[-3:]
2✔
225
            return self.remote_subjects_folder.joinpath(*session_parts)
2✔
226

227
    @property
2✔
228
    def file_experiment_description(self):
2✔
229
        """Returns the local experiment description file, if none found, returns one with the tag."""
230
        return next(
2✔
231
            self.session_path.glob('_ibl_experiment.description*'),
232
            self.session_path.joinpath(f'_ibl_experiment.description_{self.tag}.yaml'),
233
        )
234

235
    def glob_file_remote_copy_status(self, status='*'):
2✔
236
        """status: pending / complete"""
237
        fr = self.file_remote_experiment_description
2✔
238
        return next(fr.parent.glob(f'{fr.stem}.status_{status}'), None) if fr else None
2✔
239

240
    @property
2✔
241
    def file_remote_experiment_description(self):
2✔
242
        """Return the remote path to the remote stub file."""
243
        if self.remote_subjects_folder:
2✔
244
            return session_params.get_remote_stub_name(self.remote_session_path, device_id=self.tag)
2✔
245

246
    @property
2✔
247
    def remote_experiment_description_stub(self):
2✔
248
        return session_params.read_params(self.file_remote_experiment_description)
2✔
249

250
    def _copy_collections(self):
2✔
251
        """
252
        Copy collections defined in experiment description file.
253

254
        This is the method to subclass for pre- and post- copy routines.
255

256
        Returns
257
        -------
258
        bool
259
            True if transfer successfully completed.
260
        """
261
        status = True
2✔
262
        exp_pars = session_params.read_params(self.session_path)
2✔
263
        collections = set()
2✔
264
        # First glob on each collection pattern to find all folders to transfer
265
        for collection in session_params.get_collections(exp_pars, flat=True):
2✔
266
            folders = filter(Path.is_dir, self.session_path.glob(collection))
2✔
267
            _collections = list(map(lambda x: x.relative_to(self.session_path).as_posix(), folders))
2✔
268
            if not _collections:
2✔
UNCOV
269
                log.error(f'No collection(s) matching "{collection}" found')
×
UNCOV
270
                status = False
×
UNCOV
271
                continue
×
272
            collections.update(_collections)
2✔
273

274
        # Attempt to copy each folder
275
        for collection in collections:
2✔
276
            local_collection = self.session_path.joinpath(collection)
2✔
277
            assert local_collection.exists(), f'local collection "{collection}" no longer exists'
2✔
278
            log.info(f'transferring {self.session_path} - {collection}')
2✔
279
            remote_collection = self.remote_session_path.joinpath(collection)
2✔
280
            if remote_collection.exists():
2✔
281
                # this is far from ideal: we currently recopy all files even if some already copied
UNCOV
282
                log.warning(f'Collection {remote_collection} already exists, removing')
×
UNCOV
283
                shutil.rmtree(remote_collection)
×
284
            status &= copy_folders(local_collection, remote_collection)
2✔
285
        status &= self.copy_snapshots()  # special case: copy snapshots without deleting or overwriting remote files
2✔
286
        return status
2✔
287

288
    def copy_snapshots(self):
2✔
289
        """
290
        Copy snapshots files from root session path.
291

292
        Unlike the collection folders defined in the experiment description folder, the snapshots folder is optional and
293
        may exist on multiple acquisition PCs.  This method will copy any files over if the snapshots folder exists,
294
        however it will fail if a file of the same name already exists. This ensures snapshots from multiple computers
295
        don't get overwritten.
296

297
        Returns
298
        -------
299
        bool
300
            True if transfer successfully completed.
301
        """
302
        snapshots = self.session_path.joinpath('snapshots')
2✔
303
        if not snapshots.exists():
2✔
304
            log.debug('Snapshots folder %s does not exist', snapshots.as_posix())
2✔
305
            return True
2✔
306
        log.info(f'transferring {self.session_path} - {snapshots.name}')
2✔
307
        remote_snapshots = self.remote_session_path.joinpath(snapshots.name)
2✔
308
        # Get set of local and remote snapshot filenames
309
        local_files, remote_files = (
2✔
310
            set(map(lambda x: x.relative_to(p).as_posix(), filter(Path.is_file, p.rglob('*'))))
311
            for p in (snapshots, remote_snapshots)
312
        )
313
        if not any(local_files):
2✔
314
            log.debug('Snapshots folder %s is empty', snapshots.as_posix())
2✔
315
            return True
2✔
316
        if any(duplicates := local_files.intersection(remote_files)):
2✔
317
            log.error('The following snapshots already exist in %s\n%s', remote_snapshots.as_posix(), ', '.join(duplicates))
2✔
318
            log.info('Could not copy %s to %s', snapshots.as_posix(), remote_snapshots.as_posix())
2✔
319
            return False
2✔
320
        # 'overwrite' actually means 'don't raise if remote folder exists'.
321
        # We've already checked that filenames don't conflict.
322
        return copy_folders(snapshots, remote_snapshots, overwrite=True)
2✔
323

324
    def copy_collections(self, *args, **kwargs):
2✔
325
        """
326
        Recursively copies the collection folders into the remote session path.
327

328
        Do not overload, overload _copy_collections instead.
329
        """
330
        if self.glob_file_remote_copy_status('complete'):
2✔
331
            log.warning(
2✔
332
                f'Copy already complete for {self.session_path},'
333
                f' remove {self.glob_file_remote_copy_status("complete")} to force'
334
            )
335
            return True
2✔
336
        status = self._copy_collections(*args, **kwargs)
2✔
337
        # post copy stuff: rename the pending flag to complete
338
        if status:
2✔
339
            pending_file = self.glob_file_remote_copy_status('pending')
2✔
340
            pending_file.rename(pending_file.with_suffix('.status_complete'))
2✔
341
            if self.session_path.joinpath('transfer_me.flag').exists():
2✔
342
                self.session_path.joinpath('transfer_me.flag').unlink()
2✔
343
        return status
2✔
344

345
    def initialize_experiment(self, acquisition_description=None, overwrite=True):
2✔
346
        """
347
        Copy acquisition description yaml to the server and local transfers folder.
348

349
        Parameters
350
        ----------
351
        acquisition_description : dict
352
            The data to write to the experiment.description.yaml file.
353
        overwrite : bool
354
            If true, overwrite any existing file with the new one, otherwise, update the existing file.
355
        """
356
        if acquisition_description is None:
2✔
UNCOV
357
            acquisition_description = self.experiment_description
×
358

359
        assert acquisition_description
2✔
360

361
        # First attempt to add the remote description stub to the _device folder on the remote session
362
        if not self.remote_subjects_folder:
2✔
363
            log.info('The remote path is unspecified and remote experiment.description stub creation is omitted.')
2✔
364
        else:
365
            remote_stub_file = self.file_remote_experiment_description
2✔
366
            previous_description = (
2✔
367
                session_params.read_params(remote_stub_file) if remote_stub_file.exists() and not overwrite else {}
368
            )
369
            try:
2✔
370
                merged_description = session_params.merge_params(previous_description, acquisition_description)
2✔
371
                session_params.write_yaml(remote_stub_file, merged_description)
2✔
372
                for f in remote_stub_file.parent.glob(remote_stub_file.stem + '.status_*'):
2✔
UNCOV
373
                    f.unlink()
×
374
                remote_stub_file.with_suffix('.status_pending').touch()
2✔
375
                log.info(f'Written data to remote device at: {remote_stub_file}.')
2✔
376
            except Exception as e:
1✔
377
                if self.assert_connect_on_init:
1✔
UNCOV
378
                    raise RuntimeError(f'Failed to write data to remote device at: {remote_stub_file}. \n {e}') from e
×
379
                log.warning(f'Failed to write data to remote device at: {remote_stub_file}. \n {e}')
1✔
380

381
        # then create on the local machine
382
        previous_description = (
2✔
383
            session_params.read_params(self.file_experiment_description)
384
            if self.file_experiment_description.exists() and not overwrite
385
            else {}
386
        )
387
        session_params.write_yaml(
2✔
388
            self.file_experiment_description, session_params.merge_params(previous_description, acquisition_description)
389
        )
390
        log.info(f'Written data to local session at : {self.file_experiment_description}.')
2✔
391

392
    def finalize_copy(self, number_of_expected_devices=None):
2✔
393
        """At the end of the copy, check if all the files are there and if so, aggregate the device files."""
394
        ready_to_finalize = 0
2✔
395
        # List the stub files in _devices folder
396
        files_stub = list(self.file_remote_experiment_description.parent.glob('*.yaml'))
2✔
397
        if number_of_expected_devices is None:
2✔
398
            number_of_expected_devices = len(files_stub)
2✔
399
        log.debug(f'Number of expected devices is {number_of_expected_devices}')
2✔
400

401
        for file_stub in files_stub:
2✔
402
            ready_to_finalize += int(file_stub.with_suffix('.status_complete').exists())
2✔
403
            ad_stub = session_params.read_params(file_stub)
2✔
404
            # here we check the sync field of the device files
405
            if next(iter(ad_stub.get('sync', {})), None) != 'bpod' and number_of_expected_devices == 1:
2✔
406
                log.warning(
2✔
407
                    'Only bpod is supported for single device sessions, it seems you are '
408
                    'attempting to transfer a session with more than one device.'
409
                )
410
                return
2✔
411

412
        if ready_to_finalize > number_of_expected_devices:
2✔
UNCOV
413
            log.error('More stub files (%i) than expected devices (%i)', ready_to_finalize, number_of_expected_devices)
×
UNCOV
414
            return
×
415
        log.info(f'{ready_to_finalize}/{number_of_expected_devices} copy completion status')
2✔
416
        if ready_to_finalize == number_of_expected_devices:
2✔
417
            for file_stub in files_stub:
2✔
418
                session_params.aggregate_device(file_stub, self.remote_session_path.joinpath('_ibl_experiment.description.yaml'))
2✔
419
                file_stub.with_suffix('.status_complete').rename(file_stub.with_suffix('.status_final'))
2✔
420
            self.remote_session_path.joinpath('raw_session.flag').touch()
2✔
421

422

423
class VideoCopier(SessionCopier):
2✔
424
    tag = 'video'
2✔
425
    assert_connect_on_init = True
2✔
426

427
    def create_video_stub(self, config, collection='raw_video_data'):
2✔
428
        acquisition_description = self.config2stub(config, collection)
2✔
429
        session_params.write_params(self.session_path, acquisition_description)
2✔
430

431
    @staticmethod
2✔
432
    def config2stub(config: dict, collection: str = 'raw_video_data') -> dict:
2✔
433
        """
434
        Generate acquisition description stub from a camera config dict.
435

436
        Parameters
437
        ----------
438
        config : dict
439
            A cameras configuration dictionary, found in `device_cameras` of hardware_settings.yaml.
440
        collection : str
441
            The video output collection.
442

443
        Returns
444
        -------
445
        dict
446
            An acquisition description file stub.
447
        """
448
        cameras = {}
2✔
449
        for label, settings in filter(lambda itms: itms[0] != 'BONSAI_WORKFLOW', config.items()):
2✔
450
            settings_mod = {k.lower(): v for k, v in settings.items() if v is not None and k != 'INDEX'}
2✔
451
            cameras[label] = dict(collection=collection, **settings_mod)
2✔
452
        acq_desc = {'devices': {'cameras': cameras}, 'version': '1.0.0'}
2✔
453
        return acq_desc
2✔
454

455
    def initialize_experiment(self, acquisition_description=None, **kwargs):
2✔
456
        if not acquisition_description:
2✔
457
            # creates the acquisition description stub if not found, and then read it
458
            if not self.file_experiment_description.exists():
2✔
UNCOV
459
                raise FileNotFoundError(self.file_experiment_description)
×
460
            acquisition_description = session_params.read_params(self.file_experiment_description)
2✔
461
        self._experiment_description = acquisition_description
2✔
462
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
463

464

465
class BehaviorCopier(SessionCopier):
2✔
466
    tag = 'behavior'
2✔
467
    assert_connect_on_init = False
2✔
468

469
    @property
2✔
470
    def experiment_description(self):
2✔
471
        return session_params.read_params(self.session_path)
2✔
472

473
    def _copy_collections(self):
2✔
474
        """Patch settings files before copy.
475

476
        Before copying the collections, this method checks that the behaviour data are valid. The
477
        following checks are made:
478

479
        #. Check at least 1 task collection in experiment description. If not, return.
480
        #. For each collection, check for task settings. If any are missing, return.
481
        #. If SESSION_END_TIME is missing, assumes task crashed. If so and task data missing and
482
           not a chained protocol (i.e. it is the only task collection), assume a dud and remove
483
           the remote stub file.  Otherwise, patch settings with total trials, end time, etc.
484

485
        Returns
486
        -------
487
        bool
488
            True if transfer successfully completed.
489

490
        """
491
        collections = session_params.get_task_collection(self.experiment_description)
2✔
492
        if not collections:
2✔
UNCOV
493
            log.error(f'Skipping: no task collections defined for {self.session_path}')
×
UNCOV
494
            return False
×
495
        for collection in (collections := ensure_list(collections)):
2✔
496
            task_settings = raw_data_loaders.load_settings(self.session_path, task_collection=collection)
2✔
497
            if task_settings is None:
2✔
UNCOV
498
                log.info(f'Skipping: no task settings found for {self.session_path}')
×
UNCOV
499
                return False  # may also want to remove session here if empty
×
500
            # here if the session end time has not been labeled we assume that the session crashed, and patch the settings
501
            if task_settings['SESSION_END_TIME'] is None:
2✔
502
                jsonable = self.session_path.joinpath(collection, '_iblrig_taskData.raw.jsonable')
2✔
503
                if not jsonable.exists():
2✔
504
                    log.info(f'Skipping: no task data found for {self.session_path}')
2✔
505
                    # No local data and only behaviour stub in remote; assume dud and remove entire session
506
                    if (
2✔
507
                        self.remote_session_path.exists()
508
                        and len(collections) == 1
509
                        and len(list(self.file_remote_experiment_description.parent.glob('*.yaml'))) <= 1
510
                    ):
511
                        shutil.rmtree(self.remote_session_path)  # remove likely dud
2✔
512
                    return False
2✔
513
                trials, bpod_data = load_task_jsonable(jsonable)
2✔
514
                ntrials = trials.shape[0]
2✔
515
                # We have the case where the session hard crashed.
516
                # Patch the settings file to wrap the session and continue the copying.
517
                log.warning(f'Recovering crashed session {self.session_path}')
2✔
518
                settings_file = self.session_path.joinpath(collection, '_iblrig_taskSettings.raw.json')
2✔
519
                with open(settings_file) as fid:
2✔
520
                    raw_settings = json.load(fid)
2✔
521
                raw_settings['NTRIALS'] = int(ntrials)
2✔
522
                raw_settings['NTRIALS_CORRECT'] = int(trials['trial_correct'].sum())
2✔
523
                raw_settings['TOTAL_WATER_DELIVERED'] = int(trials['reward_amount'].sum())
2✔
524
                # cast the timestamp in a datetime object and add the session length to it
525
                end_time = datetime.datetime.strptime(raw_settings['SESSION_START_TIME'], '%Y-%m-%dT%H:%M:%S.%f')
2✔
526
                end_time += datetime.timedelta(seconds=bpod_data[-1]['Trial end timestamp'])
2✔
527
                raw_settings['SESSION_END_TIME'] = end_time.strftime('%Y-%m-%dT%H:%M:%S.%f')
2✔
528
                with open(settings_file, 'w') as fid:
2✔
529
                    json.dump(raw_settings, fid)
2✔
530
        log.critical(f'{self.state}, {self.session_path}')
2✔
531
        return super()._copy_collections()  # proceed with copy
2✔
532

533
    def finalize_copy(self, number_of_expected_devices=None):
2✔
534
        """If main sync is bpod, expect a single stub file."""
535
        if number_of_expected_devices is None and session_params.get_sync(self.remote_experiment_description_stub) == 'bpod':
2✔
UNCOV
536
            number_of_expected_devices = 1
×
537
        super().finalize_copy(number_of_expected_devices=number_of_expected_devices)
2✔
538

539

540
class EphysCopier(SessionCopier):
2✔
541
    tag = 'ephys'
2✔
542
    assert_connect_on_init = True
2✔
543

544
    def initialize_experiment(self, acquisition_description=None, nprobes=None, main_sync=True, **kwargs):
2✔
545
        if not acquisition_description:
2✔
546
            acquisition_description = {'devices': {'neuropixel': {}}}
2✔
547
            neuropixel = acquisition_description['devices']['neuropixel']
2✔
548
            if nprobes is None:
2✔
549
                nprobes = len(list(self.session_path.glob('**/*.ap.bin')))
2✔
550
            for n in range(nprobes):
2✔
551
                name = f'probe{n:02}'
2✔
552
                neuropixel[name] = {'collection': f'raw_ephys_data/{name}', 'sync_label': 'imec_sync'}
2✔
553
            sync_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'sync', 'nidq.yaml')
2✔
554
            acquisition_description = acquisition_description if neuropixel else {}
2✔
555
            if main_sync:
2✔
556
                acquisition_description.update(session_params.read_params(sync_file))
2✔
557

558
        self._experiment_description = acquisition_description
2✔
559
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
560
        # once the session folders have been initialized, create the probe folders
561
        (ephys_path := self.session_path.joinpath('raw_ephys_data')).mkdir(exist_ok=True)
2✔
562
        for n in range(nprobes):
2✔
563
            ephys_path.joinpath(f'probe{n:02}').mkdir(exist_ok=True)
2✔
564

565
    def _copy_collections(self):
2✔
566
        """Here we overload the copy to be able to rename the probes properly and also create the insertions."""
567
        log.info(f'Transferring ephys session: {self.session_path} to {self.remote_session_path}')
2✔
568
        ibllib.pipes.misc.rename_ephys_files(self.session_path)
2✔
569
        ibllib.pipes.misc.move_ephys_files(self.session_path)
2✔
570
        # copy the wiring files from template
571
        path_wiring = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'neuropixel', 'wirings')
2✔
572
        probe_model = '3A'
2✔
573
        for file_nidq_bin in self.session_path.joinpath('raw_ephys_data').glob('*.nidq.bin'):
2✔
574
            probe_model = '3B'
2✔
575
            shutil.copy(path_wiring.joinpath('nidq.wiring.json'), file_nidq_bin.with_suffix('.wiring.json'))
2✔
576
        for file_ap_bin in self.session_path.joinpath('raw_ephys_data').rglob('*.ap.bin'):
2✔
577
            shutil.copy(path_wiring.joinpath(f'{probe_model}.wiring.json'), file_ap_bin.with_suffix('.wiring.json'))
2✔
578
        try:
2✔
579
            ibllib.pipes.misc.create_alyx_probe_insertions(self.session_path)
2✔
580
        except Exception:
2✔
581
            log.error(traceback.print_exc())
2✔
582
            log.info('Probe creation failed, please create the probe insertions manually. Continuing transfer...')
2✔
583
        return copy_folders(
2✔
584
            local_folder=self.session_path.joinpath('raw_ephys_data'),
585
            remote_folder=self.remote_session_path.joinpath('raw_ephys_data'),
586
            overwrite=True,
587
        )
588

589

590
class NeurophotometricsCopier(SessionCopier):
2✔
591
    tag = 'neurophotometrics'
2✔
592
    assert_connect_on_init = True
2✔
593

594
    def __init__(self, *args, **kwargs):
2✔
595
        super().__init__(*args, **kwargs)
2✔
596
        if self.file_experiment_description.exists() and self.experiment_description is None:
2✔
597
            self._experiment_description = session_params.read_params(self.file_experiment_description)
2✔
598

599
    def initialize_experiment(self, acquisition_description=None, **kwargs):
2✔
600
        assert acquisition_description is not None, 'No acquisition description provided'
2✔
601
        self._experiment_description = acquisition_description
2✔
602
        super().initialize_experiment(acquisition_description=acquisition_description, **kwargs)
2✔
603
        self.session_path.joinpath('transfer_me.flag').touch()
2✔
604

605
    @staticmethod
2✔
606
    def neurophotometrics_description(
2✔
607
        rois: Iterable[str],
608
        locations: Iterable[str],
609
        sync_channel: int,
610
        start_time: datetime.datetime = None,
611
        sync_label: str = None,
612
        collection: str = 'raw_photometry_data',
613
    ) -> dict:
614
        """
615
        Create the `neurophotometrics` description part for the specified parameters.
616

617
        Parameters
618
        ----------
619
        rois: list of strings
620
            List of ROIs
621
        locations: list of strings
622
            List of brain regions
623
        sync_channel: int
624
            Channel number for sync
625
        start_time: datetime.datetime, optional
626
            Date and time of the recording
627
        sync_label: str, optional
628
            Label for the sync channel
629

630
        Returns
631
        -------
632
        dict
633
            Description of the neurophotometrics data
634
            {neurophotometrics': ...}, see below for the yaml rendition of dictionaries
635

636

637
        Example where bpod sends sync to the neurophotometrics:
638
        -------
639
            neurophotometrics:
640
                fibers:
641
                - roi: G0
642
                  location: VTA
643
                - roi: G1
644
                  location: DR
645
                collection: raw_photometry_data
646
                sync_label: bnc1out
647
                sync_channel: 1
648
                datetime: 2024-09-19T14:13:18.749259
649
            sync:
650
                bpod
651

652
        Here MAIN_SYNC=True on behaviour
653

654
        Example where a DAQ records frame times and sync:
655
        -------
656
            neurophotometrics:
657
                fibers:
658
                - roi: G0
659
                  location: VTA
660
                - roi: G1
661
                  location: DR
662
                collection: raw_photometry_data
663
                sync_channel: 5
664
                datetime: 2024-09-19T14:13:18.749259
665
            sync:
666
                daqami:
667
                    acquisition_software: daqami
668
                    collection: raw_sync_data
669
                    extension: bin
670
        """
671
        date_time = datetime.datetime.now() if start_time is None else start_time
2✔
672
        description = {
2✔
673
            'sync_channel': sync_channel,
674
            'datetime': date_time.isoformat(),
675
            'collection': collection,
676
        }
677
        if sync_label is not None:
2✔
UNCOV
678
            description['sync_label'] = sync_label
×
679
        description['fibers'] = {roi: {'location': location} for roi, location in zip(rois, locations, strict=False)}
2✔
680
        return {'neurophotometrics': description}
2✔
681

682
    def _copy_collections(self, folder_neurophotometric: Path) -> bool:
2✔
683
        ed = self.experiment_description['neurophotometrics']
2✔
684
        dt = datetime.datetime.fromisoformat(ed['datetime'])
2✔
685
        # Here we find the first photometry folder after the start_time. In case this is failing
686
        # we can feed a custom start_time to go to the desired folder, or just rename the folder
687
        folder_day = next(folder_neurophotometric.glob(ed['datetime'][:10]), None)
2✔
688
        assert folder_day is not None, f"Neurophotometrics folder {folder_neurophotometric} doesn't contain data"
2✔
689
        folder_times = list(folder_day.glob('T*'))
2✔
690
        assert len(folder_times) >= 1, f'No neurophotometrics acquisition files found in {folder_day}'
2✔
691
        hhmmss = sorted([int(stem[1:]) for stem in [f.stem for f in folder_times]])
2✔
692
        i = np.searchsorted(hhmmss, int(dt.strftime('%H%M%S'))) - 1
2✔
693
        csv_raw_photometry = folder_day.joinpath(f'T{hhmmss[i]}', 'raw_photometry.csv')
2✔
694
        csv_digital_inputs = folder_day.joinpath(f'T{hhmmss[i]}', 'digital_inputs.csv')
2✔
695
        assert csv_raw_photometry.exists(), f'Raw photometry file {csv_raw_photometry} not found'
2✔
696
        assert csv_digital_inputs.exists(), f'Digital inputs file {csv_digital_inputs} not found'
2✔
697
        # Copy the raw and digital inputs files to the server
698
        # TODO move this into a data loader ? Especially the schemas will apply to both the csv and parquet format
699
        df_raw_photometry = pd.read_csv(csv_raw_photometry)
2✔
700
        df_digital_inputs = pd.read_csv(csv_digital_inputs, header=None)
2✔
701
        df_digital_inputs.columns = ['ChannelName', 'Channel', 'AlwaysTrue', 'SystemTimestamp', 'ComputerTimestamp']
2✔
702
        # this will ensure the columns are present, and that there was no magic new format on a new Bonsai version
703
        schema_raw_data = pandera.DataFrameSchema(
2✔
704
            columns=dict(
705
                FrameCounter=pandera.Column(pandera.Int64),
706
                SystemTimestamp=pandera.Column(pandera.Float64),
707
                LedState=pandera.Column(pandera.Int16, coerce=True),
708
                ComputerTimestamp=pandera.Column(pandera.Float64),
709
                **{k: pandera.Column(pandera.Float64) for k in ed['fibers']},
710
            )
711
        )
712
        schema_digital_inputs = pandera.DataFrameSchema(
2✔
713
            columns=dict(
714
                ChannelName=pandera.Column(str, coerce=True),
715
                Channel=pandera.Column(pandera.Int8, coerce=True),
716
                AlwaysTrue=pandera.Column(bool, coerce=True),
717
                SystemTimestamp=pandera.Column(pandera.Float64),
718
                ComputerTimestamp=pandera.Column(pandera.Float64),
719
            )
720
        )
721
        df_raw_photometry = schema_raw_data.validate(df_raw_photometry)
2✔
722
        df_digital_inputs = schema_digital_inputs.validate(df_digital_inputs)
2✔
723
        remote_photometry_path = self.remote_session_path.joinpath(ed['collection'])
2✔
724
        remote_photometry_path.mkdir(parents=True, exist_ok=True)
2✔
725
        df_raw_photometry.to_parquet(remote_photometry_path.joinpath('_neurophotometrics_fpData.raw.pqt'))
2✔
726
        df_digital_inputs.to_parquet(remote_photometry_path.joinpath('_neurophotometrics_fpData.digitalIntputs.pqt'))
2✔
727
        shutil.copy(
2✔
728
            Path(iblrig.__file__).parents[1].joinpath('devices', 'neurophotometrics', '_neurophotometrics_fpData.channels.csv'),
729
            remote_photometry_path.joinpath('_neurophotometrics_fpData.channels.csv'),
730
        )
731

732
        return True
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc