• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 9031936551

10 May 2024 12:05PM UTC coverage: 48.538% (+1.7%) from 46.79%
9031936551

Pull #643

github

53c3e3
web-flow
Merge 3c8214f78 into ec2d8e4fe
Pull Request #643: 8.19.0

377 of 1073 new or added lines in 38 files covered. (35.14%)

977 existing lines in 19 files now uncovered.

3253 of 6702 relevant lines covered (48.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.29
/iblrig/commands.py
1
import argparse
2✔
2
import datetime
2✔
3
import logging
2✔
4
import shutil
2✔
5
import warnings
2✔
6
from collections.abc import Iterable
2✔
7
from pathlib import Path
2✔
8

9
import yaml
2✔
10

11
import iblrig
2✔
12
from iblrig.hardware import Bpod
2✔
13
from iblrig.online_plots import OnlinePlots
2✔
14
from iblrig.path_helper import get_local_and_remote_paths
2✔
15
from iblrig.transfer_experiments import BehaviorCopier, EphysCopier, SessionCopier, VideoCopier
2✔
16
from iblutil.util import setup_logger
2✔
17

18
logger = logging.getLogger(__name__)
2✔
19

20

21
tag2copier = {'behavior': BehaviorCopier, 'video': VideoCopier, 'ephys': EphysCopier}
2✔
22

23

24
def _transfer_parser(description: str) -> argparse.ArgumentParser:
2✔
25
    """
26
    Create an ArgumentParser for transfer scripts.
27

28
    This function creates an ArgumentParser object with specific arguments for a
29
    script related to data transfer. It defines command-line arguments for
30
    defining local and remote data paths and enabling the dry run mode.
31

32
    Parameters
33
    ----------
34
    description : str
35
        A brief description of the script's purpose.
36

37
    Returns
38
    -------
39
    argparse.ArgumentParser
40
        An ArgumentParser object with pre-defined arguments.
41
    """
UNCOV
42
    parser = argparse.ArgumentParser(
×
43
        description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter, argument_default=argparse.SUPPRESS
44
    )
NEW
45
    parser.add_argument('-t', '--tag', default='behavior', type=str, help='data type to transfer, e.g. "behavior", "video"')
×
46
    parser.add_argument('-l', '--local', action='store', type=dir_path, dest='local_path', help='define local data path')
×
UNCOV
47
    parser.add_argument('-r', '--remote', action='store', type=dir_path, dest='remote_path', help='define remote data path')
×
UNCOV
48
    parser.add_argument('-d', '--dry', action='store_true', dest='dry', help='do not remove local data after copying')
×
UNCOV
49
    parser.add_argument(
×
50
        '-c', '--cleanup-weeks', type=int, help='cleanup data older than this many weeks (-1 for no cleanup)', default=2
51
    )
UNCOV
52
    parser.add_argument(
×
53
        '-s', '--subject', type=str, help='an optional subject name to filter sessions by. Wildcards accepted.', default='*'
54
    )
UNCOV
55
    parser.add_argument(
×
56
        '--date', type=str, help='an optional date pattern to filter sessions by. Wildcards accepted.', default='*-*-*'
57
    )
UNCOV
58
    return parser
×
59

60

61
def dir_path(directory: str) -> Path:
2✔
62
    """
63
    Convert a string to a Path object and check if the directory exists.
64

65
    This function is intended for use as a type conversion function with argparse.
66
    It takes a string argument representing a directory path, converts it into
67
    a Path object, and checks if the directory exists. If the directory exists,
68
    it returns the Path object; otherwise, it raises an argparse.ArgumentError
69
    with an appropriate error message.
70

71
    Parameters
72
    ----------
73
    directory : str
74
        A string representing a directory path.
75

76
    Returns
77
    -------
78
    pathlib.Path
79
        A Path object representing the directory.
80

81
    Raises
82
    ------
83
    argparse.ArgumentError
84
        If the directory does not exist, an argparse.ArgumentError is raised
85
        with an error message indicating that the directory was not found.
86
    """
UNCOV
87
    directory = Path(directory)
×
UNCOV
88
    if directory.exists():
×
UNCOV
89
        return directory
×
UNCOV
90
    raise argparse.ArgumentError(None, f'Directory `{directory}` not found')
×
91

92

93
def transfer_data_cli():
2✔
94
    """
95
    Command-line interface for transferring behavioral data to the local server.
96
    """
UNCOV
97
    setup_logger('iblrig', level='INFO')
×
UNCOV
98
    args = _transfer_parser('Copy data to the local server.').parse_args()
×
UNCOV
99
    transfer_data(**vars(args), interactive=True)
×
100

101

102
def transfer_video_data_cli():
2✔
103
    """
104
    Command-line interface for transferring video data to the local server.
105
    """
UNCOV
106
    setup_logger('iblrig', level='INFO')
×
NEW
107
    warnings.warn(
×
108
        'transfer_video_data will be removed in the future. Use "transfer_data video" instead.', FutureWarning, stacklevel=2
109
    )
UNCOV
110
    args = _transfer_parser('Copy video data to the local server.').parse_args()
×
UNCOV
111
    transfer_data(**{**vars(args), 'tag': 'video'}, interactive=True)
×
112

113

114
def transfer_ephys_data_cli():
2✔
115
    """
116
    Command-line interface for transferring ephys data to the local server.
117
    """
UNCOV
118
    setup_logger('iblrig', level='INFO')
×
NEW
119
    warnings.warn(
×
120
        'transfer_ephys_data will be removed in the future. Use "transfer_data ephys" instead.', FutureWarning, stacklevel=2
121
    )
UNCOV
122
    args = _transfer_parser('Copy ephys data to the local server.').parse_args()
×
UNCOV
123
    transfer_data(**{**vars(args), 'tag': 'ephys'}, interactive=True)
×
124

125

126
def _get_subjects_folders(local_path: Path, remote_path: Path) -> tuple[Path, Path]:
2✔
127
    rig_paths = get_local_and_remote_paths(local_path, remote_path)
2✔
128
    local_path = rig_paths.local_subjects_folder
2✔
129
    remote_path = rig_paths.remote_subjects_folder
2✔
130
    assert isinstance(local_path, Path)
2✔
131
    if remote_path is None:
2✔
UNCOV
132
        raise Exception('Remote Path is not defined.')
×
133
    return local_path, remote_path
2✔
134

135

136
def _get_copiers(
2✔
137
    copier: type[SessionCopier],
138
    local_folder: Path,
139
    remote_folder: Path,
140
    lab: str = None,
141
    glob_pattern: str = '*/????-??-??/*/transfer_me.flag',
142
    interactive: bool = False,
143
    **kwargs,
144
) -> list[SessionCopier]:
145
    """
146

147
    Parameters
148
    ----------
149
    copier : SessionCopier
150
        A SessionCopier class to instantiate for each session.
151
    local_folder : str
152
        The absolute path of the local data directory (the copy root source). If None, loads from
153
        the iblrig settings file.
154
    remote_folder : str
155
        The absolute path of the remote data directory (the copy root destination). If None, loads
156
        from the iblrig settings file.
157
    lab : str
158
        The name of the lab. Only used if 'iblrig_local_subjects_path' is not defined in the
159
        settings file. If None, uses 'ALYX_LAB' field of iblrig settings.
160
    glob_pattern : str
161
        The filename to recursively search within `local_folder` for determining which sessions to
162
        copy.
163
    interactive : bool
164
        If true, users are prompted to review the sessions to copy before proceeding.
165
    kwargs
166
        Extract arguments such as `tag` to pass to the SessionCopier.
167

168
    Returns
169
    -------
170
    list of SessionCopier
171
        A list of SessionCopier objects.
172
    """
173
    # get local/remote subjects folder
174
    rig_paths = get_local_and_remote_paths(local_path=local_folder, remote_path=remote_folder, lab=lab)
2✔
175
    local_subjects_folder = local_folder or rig_paths.local_subjects_folder
2✔
176
    remote_subjects_folder = remote_folder or rig_paths.remote_subjects_folder
2✔
177
    assert isinstance(local_subjects_folder, Path)
2✔
178
    if remote_subjects_folder is None:
2✔
UNCOV
179
        raise Exception('Remote Path is not defined.')
×
180
    level = logging.INFO if interactive else logging.DEBUG
2✔
181
    logger.log(level, 'Local Path: %s', local_subjects_folder)
2✔
182
    logger.log(level, 'Remote Path: %s', remote_subjects_folder)
2✔
183

184
    # get copiers
185
    copiers = [copier(f.parent, remote_subjects_folder, **kwargs) for f in local_subjects_folder.glob(glob_pattern)]
2✔
186
    if len(copiers) == 0:
2✔
UNCOV
187
        print('Could not find any sessions to copy to the local server.')
×
188
    elif interactive:
2✔
UNCOV
189
        _print_status(copiers, 'Session states prior to transfer operation:')
×
190
        if input('\nDo you want to continue? [Y/n]  ').lower() not in ('y', ''):
×
191
            copiers = []
×
192
    return copiers
2✔
193

194

195
def _print_status(copiers: Iterable[SessionCopier], heading: str = '') -> None:
2✔
196
    print(heading)
×
UNCOV
197
    for copier in copiers:
×
198
        match copier.state:
×
199
            case 0:
×
UNCOV
200
                state = 'not registered on server'
×
UNCOV
201
            case 1:
×
202
                state = 'copy pending'
×
UNCOV
203
            case 2:
×
UNCOV
204
                state = 'copy complete'
×
UNCOV
205
            case 3:
×
UNCOV
206
                state = 'copy finalized'
×
UNCOV
207
            case _:
×
UNCOV
208
                state = 'undefined'
×
UNCOV
209
        print(f' * {copier.session_path}: {state}')
×
210

211

212
def _build_glob_pattern(subject='*', date='*-*-*', number='*', flag_file='transfer_me.flag', **kwargs):
2✔
213
    """
214
    Build the copier glob pattern from filter keyword arguments.
215

216
    Parameters
217
    ----------
218
    subject : str
219
        A subject folder filter pattern.
220
    date : str
221
        A date folder pattern.
222
    number : str
223
        A number (i.e. experiment sequence) folder pattern.
224
    flag_file : str
225
        A flag filename pattern.
226
    glob_pattern : str
227
        The full glob pattern string (if defined, overrides all other arguments).
228

229
    Returns
230
    -------
231
    str
232
        The full glob pattern.
233
    """
234
    return kwargs.get('glob_pattern', '/'.join((subject, date, number, flag_file)))
2✔
235

236

237
def transfer_data(
2✔
238
    tag=None,
239
    local_path: Path = None,
240
    remote_path: Path = None,
241
    dry: bool = False,
242
    interactive: bool = False,
243
    cleanup_weeks=2,
244
    **kwargs,
245
) -> list[SessionCopier]:
246
    """
247
    Copies data from the rig to the local server.
248

249
    Parameters
250
    ----------
251
    tag : str
252
        The acquisition PC tag to transfer, e.g. 'behavior', 'video', 'ephys', 'timeline', etc.
253
    local_path : Path
254
        Path to local subjects folder, otherwise fetches path from iblrig_settings.yaml file.
255
    remote_path : Path
256
        Path to remote subjects folder, otherwise fetches path from iblrig_settings.yaml file.
257
    dry : bool
258
        Do not copy or remove local data.
259
    interactive : bool
260
        If true, users are prompted to review the sessions to copy before proceeding.
261
    cleanup_weeks : int, bool
262
        Remove local data older than this number of weeks. If False, do not remove.
263
    kwargs
264
        Optional arguments to pass to SessionCopier constructor.
265

266
    Returns
267
    -------
268
    list of SessionCopier
269
        A list of the copier objects that were run.
270
    """
271
    if not tag:
2✔
272
        raise ValueError('Tag required.')
2✔
273
    # Build glob patten based on subject/date/number/flag_file filter
274
    kwargs['glob_pattern'] = _build_glob_pattern(**kwargs)
2✔
275
    kwargs = {k: v for k, v in kwargs.items() if k not in ('subject', 'date', 'number', 'flag_file')}
2✔
276
    local_subject_folder, remote_subject_folder = _get_subjects_folders(local_path, remote_path)
2✔
277
    copier = tag2copier.get(tag.lower(), SessionCopier)
2✔
278
    logger.info('Searching for %s sessions using %s class', tag.lower(), copier.__name__)
2✔
279
    expected_devices = kwargs.pop('number_of_expected_devices', None)
2✔
280
    copiers = _get_copiers(copier, local_subject_folder, remote_subject_folder, interactive=interactive, tag=tag, **kwargs)
2✔
281

282
    for copier in copiers:
2✔
283
        logger.critical(f'{copier.state}, {copier.session_path}')
2✔
284
        if not dry:
2✔
285
            copier.run(number_of_expected_devices=expected_devices)
2✔
286

287
    if interactive:
2✔
UNCOV
288
        _print_status(copiers, 'States after transfer operation:')
×
289

290
    # once we copied the data, remove older session for which the data was successfully uploaded
291
    if isinstance(cleanup_weeks, int) and cleanup_weeks > -1:
2✔
292
        remove_local_sessions(
2✔
293
            weeks=cleanup_weeks, dry=dry, local_path=local_subject_folder, remote_path=remote_subject_folder, tag=tag
294
        )
295
    return copiers
2✔
296

297

298
def remove_local_sessions(weeks=2, local_path=None, remote_path=None, dry=False, tag='behavior'):
2✔
299
    """
300
    Remove local sessions older than N weeks.
301

302
    Parameters
303
    ----------
304
    weeks : int
305
        Remove local sessions older than this number of weeks.
306
    local_path : Path
307
        Path to local subjects folder, otherwise fetches path from iblrig_settings.yaml file.
308
    remote_path : Path
309
        Path to remote subjects folder, otherwise fetches path from iblrig_settings.yaml file.
310
    dry : bool
311
        Do not remove local data if True.
312
    tag : str
313
        The acquisition PC tag to transfer, e.g. 'behavior', 'video', 'ephys', 'timeline', etc.
314

315
    Returns
316
    -------
317
    list of Path
318
        A list of removed session paths.
319
    """
320
    local_subject_folder, remote_subject_folder = _get_subjects_folders(local_path, remote_path)
2✔
321
    size = 0
2✔
322
    copier = tag2copier.get(tag.lower(), SessionCopier)
2✔
323
    removed = []
2✔
324
    for flag in sorted(list(local_subject_folder.rglob(f'_ibl_experiment.description_{tag}.yaml')), reverse=True):
2✔
325
        session_path = flag.parent
2✔
326
        days_elapsed = (datetime.datetime.now() - datetime.datetime.strptime(session_path.parts[-2], '%Y-%m-%d')).days
2✔
327
        if days_elapsed < (weeks * 7):
2✔
328
            continue
2✔
NEW
329
        sc = copier(session_path, remote_subjects_folder=remote_subject_folder)
×
330
        if sc.state == 3:
×
331
            session_size = sum(f.stat().st_size for f in session_path.rglob('*') if f.is_file()) / 1024**3
×
332
            logger.info(f'{sc.session_path}, {session_size:0.02f} Go')
×
333
            size += session_size
×
UNCOV
334
            if not dry:
×
UNCOV
335
                shutil.rmtree(session_path)
×
UNCOV
336
            removed.append(session_path)
×
337
    logger.info(f'Cleanup size {size:0.02f} Go')
2✔
338
    return removed
2✔
339

340

341
def view_session():
2✔
342
    """
343
    Entry point for command line: usage as below
344
    >>> view_session /full/path/to/jsonable/_iblrig_taskData.raw.jsonable
345
    :return: None
346
    """
UNCOV
347
    parser = argparse.ArgumentParser()
×
UNCOV
348
    parser.add_argument('file_jsonable', help='full file path to jsonable file')
×
NEW
349
    parser.add_argument('file_settings', help='full file path to settings file', nargs='?', default=None)
×
UNCOV
350
    args = parser.parse_args()
×
351

NEW
352
    online_plots = OnlinePlots(task_file=args.file_jsonable, settings_file=args.file_settings)
×
NEW
353
    online_plots.run(task_file=args.file_jsonable)
×
354

355

356
def flush():
2✔
357
    """
358
    Flushes the valve until the user hits enter
359
    :return:
360
    """
UNCOV
361
    file_settings = Path(iblrig.__file__).parents[1].joinpath('settings', 'hardware_settings.yaml')
×
UNCOV
362
    hardware_settings = yaml.safe_load(file_settings.read_text())
×
UNCOV
363
    bpod = Bpod(hardware_settings['device_bpod']['COM_BPOD'])
×
UNCOV
364
    bpod.flush()
×
UNCOV
365
    bpod.close()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc