• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 14196118657

01 Apr 2025 12:52PM UTC coverage: 47.634% (+0.8%) from 46.79%
14196118657

Pull #795

github

cfb5bd
web-flow
Merge 5ba5d5f25 into 58cf64236
Pull Request #795: fixes for habituation CW

11 of 12 new or added lines in 1 file covered. (91.67%)

1083 existing lines in 22 files now uncovered.

4288 of 9002 relevant lines covered (47.63%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.73
/iblrig/commands.py
1
import argparse
2✔
2
import datetime
2✔
3
import logging
2✔
4
import shutil
2✔
5
import warnings
2✔
6
from collections.abc import Iterable
2✔
7
from pathlib import Path
2✔
8

9
import yaml
2✔
10

11
import iblrig
2✔
12
from iblrig.hardware import Bpod
2✔
13
from iblrig.online_plots import OnlinePlots
2✔
14
from iblrig.path_helper import get_local_and_remote_paths
2✔
15
from iblrig.transfer_experiments import BehaviorCopier, EphysCopier, NeurophotometricsCopier, SessionCopier, VideoCopier
2✔
16
from iblutil.util import setup_logger
2✔
17

18
logger = logging.getLogger(__name__)
2✔
19

20

21
tag2copier = {
2✔
22
    'behavior': BehaviorCopier,
23
    'video': VideoCopier,
24
    'ephys': EphysCopier,
25
    'neurophotometrics': NeurophotometricsCopier,
26
}
27

28

29
def _transfer_parser(description: str) -> argparse.ArgumentParser:
2✔
30
    """
31
    Create an ArgumentParser for transfer scripts.
32

33
    This function creates an ArgumentParser object with specific arguments for a
34
    script related to data transfer. It defines command-line arguments for
35
    defining local and remote data paths and enabling the dry run mode.
36

37
    Parameters
38
    ----------
39
    description : str
40
        A brief description of the script's purpose.
41

42
    Returns
43
    -------
44
    argparse.ArgumentParser
45
        An ArgumentParser object with pre-defined arguments.
46
    """
UNCOV
47
    parser = argparse.ArgumentParser(
×
48
        description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter, argument_default=argparse.SUPPRESS
49
    )
UNCOV
50
    parser.add_argument('-t', '--tag', default='behavior', type=str, help='data type to transfer, e.g. "behavior", "video"')
×
UNCOV
51
    parser.add_argument('-l', '--local', action='store', type=dir_path, dest='local_path', help='define local data path')
×
UNCOV
52
    parser.add_argument('-r', '--remote', action='store', type=dir_path, dest='remote_path', help='define remote data path')
×
UNCOV
53
    parser.add_argument('-d', '--dry', action='store_true', dest='dry', help='do not remove local data after copying')
×
UNCOV
54
    parser.add_argument(
×
55
        '-c', '--cleanup-weeks', type=int, help='cleanup data older than this many weeks (-1 for no cleanup)', default=2
56
    )
UNCOV
57
    parser.add_argument(
×
58
        '-s', '--subject', type=str, help='an optional subject name to filter sessions by. Wildcards accepted.', default='*'
59
    )
UNCOV
60
    parser.add_argument(
×
61
        '--date', type=str, help='an optional date pattern to filter sessions by. Wildcards accepted.', default='*-*-*'
62
    )
UNCOV
63
    return parser
×
64

65

66
def dir_path(directory: str) -> Path:
2✔
67
    """
68
    Convert a string to a Path object and check if the directory exists.
69

70
    This function is intended for use as a type conversion function with argparse.
71
    It takes a string argument representing a directory path, converts it into
72
    a Path object, and checks if the directory exists. If the directory exists,
73
    it returns the Path object; otherwise, it raises an argparse.ArgumentError
74
    with an appropriate error message.
75

76
    Parameters
77
    ----------
78
    directory : str
79
        A string representing a directory path.
80

81
    Returns
82
    -------
83
    pathlib.Path
84
        A Path object representing the directory.
85

86
    Raises
87
    ------
88
    argparse.ArgumentError
89
        If the directory does not exist, an argparse.ArgumentError is raised
90
        with an error message indicating that the directory was not found.
91
    """
UNCOV
92
    directory = Path(directory)
×
93
    if directory.exists():
×
94
        return directory
×
UNCOV
95
    raise argparse.ArgumentError(None, f'Directory `{directory}` not found')
×
96

97

98
def transfer_data_cli():
2✔
99
    """Command-line interface for transferring behavioral data to the local server."""
UNCOV
100
    setup_logger('iblrig', level='INFO')
×
101
    args = _transfer_parser('Copy data to the local server.').parse_args()
×
102
    transfer_data(**vars(args), interactive=True)
×
103

104

105
def transfer_video_data_cli():
2✔
106
    """Command-line interface for transferring video data to the local server."""
UNCOV
107
    setup_logger('iblrig', level='INFO')
×
UNCOV
108
    warnings.warn(
×
109
        'transfer_video_data will be removed in the future. Use "transfer_data video" instead.', FutureWarning, stacklevel=2
110
    )  # see transfer_data_cli above
111
    args = _transfer_parser('Copy video data to the local server.').parse_args()
×
UNCOV
112
    transfer_data(**{**vars(args), 'tag': 'video'}, interactive=True)
×
113

114

115
def transfer_ephys_data_cli():
2✔
116
    """Command-line interface for transferring ephys data to the local server."""
UNCOV
117
    setup_logger('iblrig', level='INFO')
×
UNCOV
118
    warnings.warn(
×
119
        'transfer_ephys_data will be removed in the future. Use "transfer_data ephys" instead.', FutureWarning, stacklevel=2
120
    )
UNCOV
121
    args = _transfer_parser('Copy ephys data to the local server.').parse_args()
×
UNCOV
122
    transfer_data(**{**vars(args), 'tag': 'ephys'}, interactive=True)
×
123

124

125
def _get_subjects_folders(local_path: Path, remote_path: Path) -> tuple[Path, Path]:
2✔
126
    rig_paths = get_local_and_remote_paths(local_path, remote_path)
2✔
127
    local_path = rig_paths.local_subjects_folder
2✔
128
    remote_path = rig_paths.remote_subjects_folder
2✔
129
    assert isinstance(local_path, Path)
2✔
130
    if remote_path is None:
2✔
UNCOV
131
        raise Exception('Remote Path is not defined.')
×
132
    return local_path, remote_path
2✔
133

134

135
def _get_copiers(
2✔
136
    copier: type[SessionCopier],
137
    local_folder: Path,
138
    remote_folder: Path,
139
    lab: str = None,
140
    glob_pattern: str = '*/????-??-??/*/transfer_me.flag',
141
    interactive: bool = False,
142
    **kwargs,
143
) -> list[SessionCopier]:
144
    """
145

146
    Parameters
147
    ----------
148
    copier : SessionCopier
149
        A SessionCopier class to instantiate for each session.
150
    local_folder : str
151
        The absolute path of the local data directory (the copy root source). If None, loads from
152
        the iblrig settings file.
153
    remote_folder : str
154
        The absolute path of the remote data directory (the copy root destination). If None, loads
155
        from the iblrig settings file.
156
    lab : str
157
        The name of the lab. Only used if 'iblrig_local_subjects_path' is not defined in the
158
        settings file. If None, uses 'ALYX_LAB' field of iblrig settings.
159
    glob_pattern : str
160
        The filename to recursively search within `local_folder` for determining which sessions to
161
        copy.
162
    interactive : bool
163
        If true, users are prompted to review the sessions to copy before proceeding.
164
    kwargs
165
        Extract arguments such as `tag` to pass to the SessionCopier.
166

167
    Returns
168
    -------
169
    list of SessionCopier
170
        A list of SessionCopier objects.
171
    """
172
    # get local/remote subjects folder
173
    rig_paths = get_local_and_remote_paths(local_path=local_folder, remote_path=remote_folder, lab=lab)
2✔
174
    local_subjects_folder = local_folder or rig_paths.local_subjects_folder
2✔
175
    remote_subjects_folder = remote_folder or rig_paths.remote_subjects_folder
2✔
176
    assert isinstance(local_subjects_folder, Path)
2✔
177
    if remote_subjects_folder is None:
2✔
UNCOV
178
        raise Exception('Remote Path is not defined.')
×
179
    level = logging.INFO if interactive else logging.DEBUG
2✔
180
    logger.log(level, 'Local Path: %s', local_subjects_folder)
2✔
181
    logger.log(level, 'Remote Path: %s', remote_subjects_folder)
2✔
182

183
    # get copiers
184
    copiers = [copier(f.parent, remote_subjects_folder, **kwargs) for f in local_subjects_folder.glob(glob_pattern)]
2✔
185
    if len(copiers) == 0:
2✔
186
        print('Could not find any sessions to copy to the local server.')
×
187
    elif interactive:
2✔
UNCOV
188
        _print_status(copiers, 'Session states prior to transfer operation:')
×
189
        if input('\nDo you want to continue? [Y/n]  ').lower() not in ('y', ''):
×
190
            copiers = []
×
191
    return copiers
2✔
192

193

194
def _print_status(copiers: Iterable[SessionCopier], heading: str = '') -> None:
2✔
195
    print(heading)
×
UNCOV
196
    for copier in copiers:
×
UNCOV
197
        match copier.state:
×
198
            case 0:
×
UNCOV
199
                state = 'not registered on server'
×
UNCOV
200
            case 1:
×
UNCOV
201
                state = 'copy pending'
×
UNCOV
202
            case 2:
×
UNCOV
203
                state = 'copy complete'
×
UNCOV
204
            case 3:
×
UNCOV
205
                state = 'copy finalized'
×
UNCOV
206
            case _:
×
UNCOV
207
                state = 'undefined'
×
UNCOV
208
        print(f' * {copier.session_path}: {state}')
×
209

210

211
def _build_glob_pattern(subject='*', date='*-*-*', number='*', flag_file='transfer_me.flag', **kwargs):
2✔
212
    """
213
    Build the copier glob pattern from filter keyword arguments.
214

215
    Parameters
216
    ----------
217
    subject : str
218
        A subject folder filter pattern.
219
    date : str
220
        A date folder pattern.
221
    number : str
222
        A number (i.e. experiment sequence) folder pattern.
223
    flag_file : str
224
        A flag filename pattern.
225
    glob_pattern : str
226
        The full glob pattern string (if defined, overrides all other arguments).
227

228
    Returns
229
    -------
230
    str
231
        The full glob pattern.
232
    """
233
    return kwargs.get('glob_pattern', '/'.join((subject, date, number, flag_file)))
2✔
234

235

236
def transfer_data(
2✔
237
    tag=None,
238
    local_path: Path = None,
239
    remote_path: Path = None,
240
    dry: bool = False,
241
    interactive: bool = False,
242
    cleanup_weeks=2,
243
    **kwargs,
244
) -> list[SessionCopier]:
245
    """
246
    Copies data from the rig to the local server.
247

248
    Parameters
249
    ----------
250
    tag : str
251
        The acquisition PC tag to transfer, e.g. 'behavior', 'video', 'ephys', 'timeline', etc.
252
    local_path : Path
253
        Path to local subjects folder, otherwise fetches path from iblrig_settings.yaml file.
254
    remote_path : Path
255
        Path to remote subjects folder, otherwise fetches path from iblrig_settings.yaml file.
256
    dry : bool
257
        Do not copy or remove local data.
258
    interactive : bool
259
        If true, users are prompted to review the sessions to copy before proceeding.
260
    cleanup_weeks : int, bool
261
        Remove local data older than this number of weeks. If False, do not remove.
262
    kwargs
263
        Optional arguments to pass to SessionCopier constructor.
264

265
    Returns
266
    -------
267
    list of SessionCopier
268
        A list of the copier objects that were run.
269
    """
270
    if not tag:
2✔
271
        raise ValueError('Tag required.')
2✔
272
    # Build glob patten based on subject/date/number/flag_file filter
273
    kwargs['glob_pattern'] = _build_glob_pattern(**kwargs)
2✔
274
    kwargs = {k: v for k, v in kwargs.items() if k not in ('subject', 'date', 'number', 'flag_file')}
2✔
275
    local_subject_folder, remote_subject_folder = _get_subjects_folders(local_path, remote_path)
2✔
276
    copier = tag2copier.get(tag.lower(), SessionCopier)
2✔
277
    logger.info('Searching for %s sessions using %s class', tag.lower(), copier.__name__)
2✔
278
    expected_devices = kwargs.pop('number_of_expected_devices', None)
2✔
279
    copiers = _get_copiers(copier, local_subject_folder, remote_subject_folder, interactive=interactive, tag=tag, **kwargs)
2✔
280

281
    for copier in copiers:
2✔
282
        logger.critical(f'{copier.state}, {copier.session_path}')
2✔
283
        if not dry:
2✔
284
            copier.run(number_of_expected_devices=expected_devices)
2✔
285

286
    if interactive:
2✔
UNCOV
287
        _print_status(copiers, 'States after transfer operation:')
×
288

289
    # once we copied the data, remove older session for which the data was successfully uploaded
290
    if isinstance(cleanup_weeks, int) and cleanup_weeks > -1:
2✔
291
        remove_local_sessions(
2✔
292
            weeks=cleanup_weeks, dry=dry, local_path=local_subject_folder, remote_path=remote_subject_folder, tag=tag
293
        )
294
    return copiers
2✔
295

296

297
def remove_local_sessions(weeks=2, local_path=None, remote_path=None, dry=False, tag='behavior'):
2✔
298
    """
299
    Remove local sessions older than N weeks.
300

301
    Parameters
302
    ----------
303
    weeks : int
304
        Remove local sessions older than this number of weeks.
305
    local_path : Path
306
        Path to local subjects folder, otherwise fetches path from iblrig_settings.yaml file.
307
    remote_path : Path
308
        Path to remote subjects folder, otherwise fetches path from iblrig_settings.yaml file.
309
    dry : bool
310
        Do not remove local data if True.
311
    tag : str
312
        The acquisition PC tag to transfer, e.g. 'behavior', 'video', 'ephys', 'timeline', etc.
313

314
    Returns
315
    -------
316
    list of Path
317
        A list of removed session paths.
318
    """
319
    local_subject_folder, remote_subject_folder = _get_subjects_folders(local_path, remote_path)
2✔
320
    size = 0
2✔
321
    copier = tag2copier.get(tag.lower(), SessionCopier)
2✔
322
    removed = []
2✔
323
    for flag in sorted(list(local_subject_folder.rglob(f'_ibl_experiment.description_{tag}.yaml')), reverse=True):
2✔
324
        session_path = flag.parent
2✔
325
        days_elapsed = (datetime.datetime.now() - datetime.datetime.strptime(session_path.parts[-2], '%Y-%m-%d')).days
2✔
326
        if days_elapsed < (weeks * 7):
2✔
327
            continue
2✔
328
        if copier == SessionCopier:
×
329
            sc = copier(session_path, remote_subjects_folder=remote_subject_folder, tag=tag)
×
330
        else:
UNCOV
331
            sc = copier(session_path, remote_subjects_folder=remote_subject_folder)
×
UNCOV
332
        if sc.state == 3:
×
UNCOV
333
            session_size = sum(f.stat().st_size for f in session_path.rglob('*') if f.is_file()) / 1024**3
×
UNCOV
334
            logger.info(f'{sc.session_path}, {session_size:0.02f} Go')
×
UNCOV
335
            size += session_size
×
UNCOV
336
            if not dry:
×
UNCOV
337
                shutil.rmtree(session_path)
×
UNCOV
338
            removed.append(session_path)
×
339
    logger.info(f'Cleanup size {size:0.02f} Go')
2✔
340
    return removed
2✔
341

342

343
def view_session():
2✔
344
    """
345
    Entry point for command line: usage as below
346
    >>> view_session /full/path/to/jsonable/_iblrig_taskData.raw.jsonable
347
    :return: None
348
    """
UNCOV
349
    from iblutil.util import setup_logger
×
350

UNCOV
351
    setup_logger('iblrig', level='INFO')
×
UNCOV
352
    parser = argparse.ArgumentParser()
×
UNCOV
353
    parser.add_argument('file_jsonable', help='full file path to jsonable file')
×
UNCOV
354
    parser.add_argument('file_settings', help='full file path to settings file', nargs='?', default=None)
×
UNCOV
355
    args = parser.parse_args()
×
356

UNCOV
357
    online_plots = OnlinePlots(settings_file=args.file_settings)
×
UNCOV
358
    online_plots.run(file_jsonable=args.file_jsonable)
×
359

360

361
def flush():
2✔
362
    """Flush the valve until the user hits enter."""
UNCOV
363
    file_settings = Path(iblrig.__file__).parents[1].joinpath('settings', 'hardware_settings.yaml')
×
UNCOV
364
    hardware_settings = yaml.safe_load(file_settings.read_text())
×
UNCOV
365
    bpod = Bpod(hardware_settings['device_bpod']['COM_BPOD'])
×
UNCOV
366
    bpod.flush()
×
UNCOV
367
    bpod.close()
×
368

369

370
def remove_bonsai_layouts():
2✔
371
    """Delete all BONSAI .layout files - if they are backed up with a .layout_template file."""
UNCOV
372
    from iblrig.constants import BASE_PATH
×
373

UNCOV
374
    layout_files = [x for x in BASE_PATH.glob('**/*.bonsai.layout') if x.with_suffix('.layout_template').exists()]
×
UNCOV
375
    if len(layout_files) == 0:
×
UNCOV
376
        print('No layout files found.')
×
UNCOV
377
        return
×
UNCOV
378
    print('The following files will be deleted:')
×
UNCOV
379
    for f in layout_files:
×
UNCOV
380
        print(f'- {f.name}')
×
UNCOV
381
    if input('\nContinue? [Y/n] ').lower() in ('y', ''):
×
UNCOV
382
        for f in layout_files:
×
UNCOV
383
            f.unlink()
×
UNCOV
384
        print(f'{len(layout_files)} file{"s" if len(layout_files) > 1 else ""} deleted.')
×
385
    else:
UNCOV
386
        print('No files deleted.')
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc