• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 10568073180

26 Aug 2024 10:13PM UTC coverage: 47.538% (+0.7%) from 46.79%
10568073180

Pull #711

github

eeff82
web-flow
Merge 599c9edfb into ad41db25f
Pull Request #711: 8.23.2

121 of 135 new or added lines in 8 files covered. (89.63%)

1025 existing lines in 22 files now uncovered.

4084 of 8591 relevant lines covered (47.54%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.78
/iblrig/commands.py
1
import argparse
2✔
2
import datetime
2✔
3
import logging
2✔
4
import shutil
2✔
5
import warnings
2✔
6
from collections.abc import Iterable
2✔
7
from pathlib import Path
2✔
8

9
import yaml
2✔
10

11
import iblrig
2✔
12
from iblrig.hardware import Bpod
2✔
13
from iblrig.online_plots import OnlinePlots
2✔
14
from iblrig.path_helper import get_local_and_remote_paths
2✔
15
from iblrig.transfer_experiments import BehaviorCopier, EphysCopier, SessionCopier, VideoCopier
2✔
16
from iblutil.util import setup_logger
2✔
17

18
logger = logging.getLogger(__name__)
2✔
19

20

21
tag2copier = {'behavior': BehaviorCopier, 'video': VideoCopier, 'ephys': EphysCopier}
2✔
22

23

24
def _transfer_parser(description: str) -> argparse.ArgumentParser:
2✔
25
    """
26
    Create an ArgumentParser for transfer scripts.
27

28
    This function creates an ArgumentParser object with specific arguments for a
29
    script related to data transfer. It defines command-line arguments for
30
    defining local and remote data paths and enabling the dry run mode.
31

32
    Parameters
33
    ----------
34
    description : str
35
        A brief description of the script's purpose.
36

37
    Returns
38
    -------
39
    argparse.ArgumentParser
40
        An ArgumentParser object with pre-defined arguments.
41
    """
UNCOV
42
    parser = argparse.ArgumentParser(
×
43
        description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter, argument_default=argparse.SUPPRESS
44
    )
45
    parser.add_argument('-t', '--tag', default='behavior', type=str, help='data type to transfer, e.g. "behavior", "video"')
×
46
    parser.add_argument('-l', '--local', action='store', type=dir_path, dest='local_path', help='define local data path')
×
UNCOV
47
    parser.add_argument('-r', '--remote', action='store', type=dir_path, dest='remote_path', help='define remote data path')
×
UNCOV
48
    parser.add_argument('-d', '--dry', action='store_true', dest='dry', help='do not remove local data after copying')
×
UNCOV
49
    parser.add_argument(
×
50
        '-c', '--cleanup-weeks', type=int, help='cleanup data older than this many weeks (-1 for no cleanup)', default=2
51
    )
UNCOV
52
    parser.add_argument(
×
53
        '-s', '--subject', type=str, help='an optional subject name to filter sessions by. Wildcards accepted.', default='*'
54
    )
UNCOV
55
    parser.add_argument(
×
56
        '--date', type=str, help='an optional date pattern to filter sessions by. Wildcards accepted.', default='*-*-*'
57
    )
UNCOV
58
    return parser
×
59

60

61
def dir_path(directory: str) -> Path:
2✔
62
    """
63
    Convert a string to a Path object and check if the directory exists.
64

65
    This function is intended for use as a type conversion function with argparse.
66
    It takes a string argument representing a directory path, converts it into
67
    a Path object, and checks if the directory exists. If the directory exists,
68
    it returns the Path object; otherwise, it raises an argparse.ArgumentError
69
    with an appropriate error message.
70

71
    Parameters
72
    ----------
73
    directory : str
74
        A string representing a directory path.
75

76
    Returns
77
    -------
78
    pathlib.Path
79
        A Path object representing the directory.
80

81
    Raises
82
    ------
83
    argparse.ArgumentError
84
        If the directory does not exist, an argparse.ArgumentError is raised
85
        with an error message indicating that the directory was not found.
86
    """
UNCOV
87
    directory = Path(directory)
×
UNCOV
88
    if directory.exists():
×
UNCOV
89
        return directory
×
UNCOV
90
    raise argparse.ArgumentError(None, f'Directory `{directory}` not found')
×
91

92

93
def transfer_data_cli():
2✔
94
    """Command-line interface for transferring behavioral data to the local server."""
UNCOV
95
    setup_logger('iblrig', level='INFO')
×
UNCOV
96
    args = _transfer_parser('Copy data to the local server.').parse_args()
×
UNCOV
97
    transfer_data(**vars(args), interactive=True)
×
98

99

100
def transfer_video_data_cli():
2✔
101
    """Command-line interface for transferring video data to the local server."""
102
    setup_logger('iblrig', level='INFO')
×
UNCOV
103
    warnings.warn(
×
104
        'transfer_video_data will be removed in the future. Use "transfer_data video" instead.', FutureWarning, stacklevel=2
105
    )
UNCOV
106
    args = _transfer_parser('Copy video data to the local server.').parse_args()
×
UNCOV
107
    transfer_data(**{**vars(args), 'tag': 'video'}, interactive=True)
×
108

109

110
def transfer_ephys_data_cli():
2✔
111
    """Command-line interface for transferring ephys data to the local server."""
UNCOV
112
    setup_logger('iblrig', level='INFO')
×
113
    warnings.warn(
×
114
        'transfer_ephys_data will be removed in the future. Use "transfer_data ephys" instead.', FutureWarning, stacklevel=2
115
    )
UNCOV
116
    args = _transfer_parser('Copy ephys data to the local server.').parse_args()
×
UNCOV
117
    transfer_data(**{**vars(args), 'tag': 'ephys'}, interactive=True)
×
118

119

120
def _get_subjects_folders(local_path: Path, remote_path: Path) -> tuple[Path, Path]:
2✔
121
    rig_paths = get_local_and_remote_paths(local_path, remote_path)
2✔
122
    local_path = rig_paths.local_subjects_folder
2✔
123
    remote_path = rig_paths.remote_subjects_folder
2✔
124
    assert isinstance(local_path, Path)
2✔
125
    if remote_path is None:
2✔
UNCOV
126
        raise Exception('Remote Path is not defined.')
×
127
    return local_path, remote_path
2✔
128

129

130
def _get_copiers(
2✔
131
    copier: type[SessionCopier],
132
    local_folder: Path,
133
    remote_folder: Path,
134
    lab: str = None,
135
    glob_pattern: str = '*/????-??-??/*/transfer_me.flag',
136
    interactive: bool = False,
137
    **kwargs,
138
) -> list[SessionCopier]:
139
    """
140

141
    Parameters
142
    ----------
143
    copier : SessionCopier
144
        A SessionCopier class to instantiate for each session.
145
    local_folder : str
146
        The absolute path of the local data directory (the copy root source). If None, loads from
147
        the iblrig settings file.
148
    remote_folder : str
149
        The absolute path of the remote data directory (the copy root destination). If None, loads
150
        from the iblrig settings file.
151
    lab : str
152
        The name of the lab. Only used if 'iblrig_local_subjects_path' is not defined in the
153
        settings file. If None, uses 'ALYX_LAB' field of iblrig settings.
154
    glob_pattern : str
155
        The filename to recursively search within `local_folder` for determining which sessions to
156
        copy.
157
    interactive : bool
158
        If true, users are prompted to review the sessions to copy before proceeding.
159
    kwargs
160
        Extract arguments such as `tag` to pass to the SessionCopier.
161

162
    Returns
163
    -------
164
    list of SessionCopier
165
        A list of SessionCopier objects.
166
    """
167
    # get local/remote subjects folder
168
    rig_paths = get_local_and_remote_paths(local_path=local_folder, remote_path=remote_folder, lab=lab)
2✔
169
    local_subjects_folder = local_folder or rig_paths.local_subjects_folder
2✔
170
    remote_subjects_folder = remote_folder or rig_paths.remote_subjects_folder
2✔
171
    assert isinstance(local_subjects_folder, Path)
2✔
172
    if remote_subjects_folder is None:
2✔
173
        raise Exception('Remote Path is not defined.')
×
174
    level = logging.INFO if interactive else logging.DEBUG
2✔
175
    logger.log(level, 'Local Path: %s', local_subjects_folder)
2✔
176
    logger.log(level, 'Remote Path: %s', remote_subjects_folder)
2✔
177

178
    # get copiers
179
    copiers = [copier(f.parent, remote_subjects_folder, **kwargs) for f in local_subjects_folder.glob(glob_pattern)]
2✔
180
    if len(copiers) == 0:
2✔
UNCOV
181
        print('Could not find any sessions to copy to the local server.')
×
182
    elif interactive:
2✔
UNCOV
183
        _print_status(copiers, 'Session states prior to transfer operation:')
×
UNCOV
184
        if input('\nDo you want to continue? [Y/n]  ').lower() not in ('y', ''):
×
UNCOV
185
            copiers = []
×
186
    return copiers
2✔
187

188

189
def _print_status(copiers: Iterable[SessionCopier], heading: str = '') -> None:
2✔
190
    print(heading)
×
191
    for copier in copiers:
×
192
        match copier.state:
×
UNCOV
193
            case 0:
×
194
                state = 'not registered on server'
×
195
            case 1:
×
UNCOV
196
                state = 'copy pending'
×
UNCOV
197
            case 2:
×
198
                state = 'copy complete'
×
UNCOV
199
            case 3:
×
UNCOV
200
                state = 'copy finalized'
×
UNCOV
201
            case _:
×
UNCOV
202
                state = 'undefined'
×
UNCOV
203
        print(f' * {copier.session_path}: {state}')
×
204

205

206
def _build_glob_pattern(subject='*', date='*-*-*', number='*', flag_file='transfer_me.flag', **kwargs):
2✔
207
    """
208
    Build the copier glob pattern from filter keyword arguments.
209

210
    Parameters
211
    ----------
212
    subject : str
213
        A subject folder filter pattern.
214
    date : str
215
        A date folder pattern.
216
    number : str
217
        A number (i.e. experiment sequence) folder pattern.
218
    flag_file : str
219
        A flag filename pattern.
220
    glob_pattern : str
221
        The full glob pattern string (if defined, overrides all other arguments).
222

223
    Returns
224
    -------
225
    str
226
        The full glob pattern.
227
    """
228
    return kwargs.get('glob_pattern', '/'.join((subject, date, number, flag_file)))
2✔
229

230

231
def transfer_data(
2✔
232
    tag=None,
233
    local_path: Path = None,
234
    remote_path: Path = None,
235
    dry: bool = False,
236
    interactive: bool = False,
237
    cleanup_weeks=2,
238
    **kwargs,
239
) -> list[SessionCopier]:
240
    """
241
    Copies data from the rig to the local server.
242

243
    Parameters
244
    ----------
245
    tag : str
246
        The acquisition PC tag to transfer, e.g. 'behavior', 'video', 'ephys', 'timeline', etc.
247
    local_path : Path
248
        Path to local subjects folder, otherwise fetches path from iblrig_settings.yaml file.
249
    remote_path : Path
250
        Path to remote subjects folder, otherwise fetches path from iblrig_settings.yaml file.
251
    dry : bool
252
        Do not copy or remove local data.
253
    interactive : bool
254
        If true, users are prompted to review the sessions to copy before proceeding.
255
    cleanup_weeks : int, bool
256
        Remove local data older than this number of weeks. If False, do not remove.
257
    kwargs
258
        Optional arguments to pass to SessionCopier constructor.
259

260
    Returns
261
    -------
262
    list of SessionCopier
263
        A list of the copier objects that were run.
264
    """
265
    if not tag:
2✔
266
        raise ValueError('Tag required.')
2✔
267
    # Build glob patten based on subject/date/number/flag_file filter
268
    kwargs['glob_pattern'] = _build_glob_pattern(**kwargs)
2✔
269
    kwargs = {k: v for k, v in kwargs.items() if k not in ('subject', 'date', 'number', 'flag_file')}
2✔
270
    local_subject_folder, remote_subject_folder = _get_subjects_folders(local_path, remote_path)
2✔
271
    copier = tag2copier.get(tag.lower(), SessionCopier)
2✔
272
    logger.info('Searching for %s sessions using %s class', tag.lower(), copier.__name__)
2✔
273
    expected_devices = kwargs.pop('number_of_expected_devices', None)
2✔
274
    copiers = _get_copiers(copier, local_subject_folder, remote_subject_folder, interactive=interactive, tag=tag, **kwargs)
2✔
275

276
    for copier in copiers:
2✔
277
        logger.critical(f'{copier.state}, {copier.session_path}')
2✔
278
        if not dry:
2✔
279
            copier.run(number_of_expected_devices=expected_devices)
2✔
280

281
    if interactive:
2✔
UNCOV
282
        _print_status(copiers, 'States after transfer operation:')
×
283

284
    # once we copied the data, remove older session for which the data was successfully uploaded
285
    if isinstance(cleanup_weeks, int) and cleanup_weeks > -1:
2✔
286
        remove_local_sessions(
2✔
287
            weeks=cleanup_weeks, dry=dry, local_path=local_subject_folder, remote_path=remote_subject_folder, tag=tag
288
        )
289
    return copiers
2✔
290

291

292
def remove_local_sessions(weeks=2, local_path=None, remote_path=None, dry=False, tag='behavior'):
2✔
293
    """
294
    Remove local sessions older than N weeks.
295

296
    Parameters
297
    ----------
298
    weeks : int
299
        Remove local sessions older than this number of weeks.
300
    local_path : Path
301
        Path to local subjects folder, otherwise fetches path from iblrig_settings.yaml file.
302
    remote_path : Path
303
        Path to remote subjects folder, otherwise fetches path from iblrig_settings.yaml file.
304
    dry : bool
305
        Do not remove local data if True.
306
    tag : str
307
        The acquisition PC tag to transfer, e.g. 'behavior', 'video', 'ephys', 'timeline', etc.
308

309
    Returns
310
    -------
311
    list of Path
312
        A list of removed session paths.
313
    """
314
    local_subject_folder, remote_subject_folder = _get_subjects_folders(local_path, remote_path)
2✔
315
    size = 0
2✔
316
    copier = tag2copier.get(tag.lower(), SessionCopier)
2✔
317
    removed = []
2✔
318
    for flag in sorted(list(local_subject_folder.rglob(f'_ibl_experiment.description_{tag}.yaml')), reverse=True):
2✔
319
        session_path = flag.parent
2✔
320
        days_elapsed = (datetime.datetime.now() - datetime.datetime.strptime(session_path.parts[-2], '%Y-%m-%d')).days
2✔
321
        if days_elapsed < (weeks * 7):
2✔
322
            continue
2✔
UNCOV
323
        if copier == SessionCopier:
×
UNCOV
324
            sc = copier(session_path, remote_subjects_folder=remote_subject_folder, tag=tag)
×
325
        else:
326
            sc = copier(session_path, remote_subjects_folder=remote_subject_folder)
×
327
        if sc.state == 3:
×
328
            session_size = sum(f.stat().st_size for f in session_path.rglob('*') if f.is_file()) / 1024**3
×
329
            logger.info(f'{sc.session_path}, {session_size:0.02f} Go')
×
UNCOV
330
            size += session_size
×
UNCOV
331
            if not dry:
×
UNCOV
332
                shutil.rmtree(session_path)
×
UNCOV
333
            removed.append(session_path)
×
334
    logger.info(f'Cleanup size {size:0.02f} Go')
2✔
335
    return removed
2✔
336

337

338
def view_session():
2✔
339
    """
340
    Entry point for command line: usage as below
341
    >>> view_session /full/path/to/jsonable/_iblrig_taskData.raw.jsonable
342
    :return: None
343
    """
UNCOV
344
    from iblutil.util import setup_logger
×
345

UNCOV
346
    setup_logger('iblrig', level='INFO')
×
UNCOV
347
    parser = argparse.ArgumentParser()
×
UNCOV
348
    parser.add_argument('file_jsonable', help='full file path to jsonable file')
×
UNCOV
349
    parser.add_argument('file_settings', help='full file path to settings file', nargs='?', default=None)
×
UNCOV
350
    args = parser.parse_args()
×
351

UNCOV
352
    online_plots = OnlinePlots(settings_file=args.file_settings)
×
UNCOV
353
    online_plots.run(file_jsonable=args.file_jsonable)
×
354

355

356
def flush():
2✔
357
    """Flush the valve until the user hits enter."""
UNCOV
358
    file_settings = Path(iblrig.__file__).parents[1].joinpath('settings', 'hardware_settings.yaml')
×
UNCOV
359
    hardware_settings = yaml.safe_load(file_settings.read_text())
×
UNCOV
360
    bpod = Bpod(hardware_settings['device_bpod']['COM_BPOD'])
×
UNCOV
361
    bpod.flush()
×
UNCOV
362
    bpod.close()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc