• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / ibllib / 7961675356254463

pending completion
7961675356254463

Pull #557

continuous-integration/UCL

olivier
add test
Pull Request #557: Chained protocols

718 of 718 new or added lines in 27 files covered. (100.0%)

12554 of 18072 relevant lines covered (69.47%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.26
/ibllib/pipes/local_server.py
1
import logging
1✔
2
import time
1✔
3
from datetime import datetime
1✔
4
from pathlib import Path
1✔
5
import pkg_resources
1✔
6
import re
1✔
7
import subprocess
1✔
8
import sys
1✔
9
import traceback
1✔
10
import importlib
1✔
11

12
from one.api import ONE
1✔
13

14
from ibllib.io.extractors.base import get_pipeline, get_task_protocol, get_session_extractor_type
1✔
15
from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing
1✔
16
from ibllib.time import date2isostr
1✔
17
from ibllib.oneibl.registration import IBLRegistrationClient, register_session_raw_data, get_lab
1✔
18
from ibllib.oneibl.data_handlers import get_local_data_repository
1✔
19
from ibllib.io.session_params import read_params
1✔
20
from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session
1✔
21

22
_logger = logging.getLogger(__name__)
1✔
23
LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC']
1✔
24

25

26
def _get_pipeline_class(session_path, one):
1✔
27
    pipeline = get_pipeline(session_path)
1✔
28
    if pipeline == 'training':
1✔
29
        PipelineClass = training_preprocessing.TrainingExtractionPipeline
1✔
30
    elif pipeline == 'ephys':
1✔
31
        PipelineClass = ephys_preprocessing.EphysExtractionPipeline
1✔
32
    else:
33
        # try and look if there is a custom extractor in the personal projects extraction class
34
        import projects.base
×
35
        task_type = get_session_extractor_type(session_path)
×
36
        PipelineClass = projects.base.get_pipeline(task_type)
×
37
    _logger.info(f"Using {PipelineClass} pipeline for {session_path}")
1✔
38
    return PipelineClass(session_path=session_path, one=one)
1✔
39

40

41
def _run_command(cmd):
1✔
42
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
×
43
                               stderr=subprocess.PIPE)
44
    info, error = process.communicate()
×
45
    if process.returncode != 0:
×
46
        return None
×
47
    else:
48
        return info.decode('utf-8').strip()
×
49

50

51
def _get_volume_usage(vol, label=''):
1✔
52
    cmd = f'df {vol}'
×
53
    res = _run_command(cmd)
×
54
    # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk']
55
    size_list = re.split(' +', res.split('\n')[-1])
×
56
    fac = 1024 ** 2
×
57
    d = {'total': int(size_list[1]) / fac,
×
58
         'used': int(size_list[2]) / fac,
59
         'available': int(size_list[3]) / fac,
60
         'volume': size_list[5]}
61
    return {f"{label}_{k}": d[k] for k in d}
×
62

63

64
def report_health(one):
1✔
65
    """
66
    Get a few indicators and label the json field of the corresponding lab with them
67
    """
68
    status = {'python_version': sys.version,
×
69
              'ibllib_version': pkg_resources.get_distribution("ibllib").version,
70
              'phylib_version': pkg_resources.get_distribution("phylib").version,
71
              'local_time': date2isostr(datetime.now())}
72
    status.update(_get_volume_usage('/mnt/s0/Data', 'raid'))
×
73
    status.update(_get_volume_usage('/', 'system'))
×
74

75
    lab_names = get_lab(one.alyx)
×
76
    for ln in lab_names:
×
77
        one.alyx.json_field_update(endpoint='labs', uuid=ln, field_name='json', data=status)
×
78

79

80
def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None):
1✔
81
    """
82
    Server function that will look for creation flags and for each:
83
    1) create the sessions on Alyx
84
    2) register the corresponding raw data files on Alyx
85
    3) create the tasks to be run on Alyx
86
    :param root_path: main path containing sessions or session path
87
    :param one
88
    :param dry
89
    :param rerun
90
    :param max_md5_size
91
    :return:
92
    """
93
    if not one:
1✔
94
        one = ONE(cache_rest=None)
×
95
    rc = IBLRegistrationClient(one=one)
1✔
96
    flag_files = list(Path(root_path).glob('**/raw_session.flag'))
1✔
97
    all_datasets = []
1✔
98
    for flag_file in flag_files:
1✔
99
        session_path = flag_file.parent
1✔
100
        _logger.info(f'creating session for {session_path}')
1✔
101
        if dry:
1✔
102
            continue
×
103

104
        try:
1✔
105
            # if the subject doesn't exist in the database, skip
106
            rc.register_session(session_path, file_list=False)
1✔
107

108
            # See if we need to create a dynamic pipeline
109
            experiment_description_file = read_params(session_path)
1✔
110
            if experiment_description_file is not None:
1✔
111
                pipe = make_pipeline(session_path, one=one)
×
112
            else:
113
                # Create legacy experiment description file
114
                acquisition_description_legacy_session(session_path, save=True)
1✔
115
                labs = ','.join(get_lab(one.alyx))
1✔
116
                files, dsets = register_session_raw_data(session_path, one=one, max_md5_size=max_md5_size, labs=labs)
1✔
117
                if dsets is not None:
1✔
118
                    all_datasets.extend(dsets)
1✔
119
                pipe = _get_pipeline_class(session_path, one)
1✔
120
                if pipe is None:
1✔
121
                    task_protocol = get_task_protocol(session_path)
×
122
                    _logger.info(f'Session task protocol {task_protocol} has no matching pipeline pattern {session_path}')
×
123
            if rerun:
1✔
124
                rerun__status__in = '__all__'
×
125
            else:
126
                rerun__status__in = ['Waiting']
1✔
127
            pipe.create_alyx_tasks(rerun__status__in=rerun__status__in)
1✔
128
            flag_file.unlink()
1✔
129
        except Exception:
1✔
130
            _logger.error(traceback.format_exc())
1✔
131
            _logger.warning(f'Creating session / registering raw datasets {session_path} errored')
1✔
132
            continue
1✔
133

134
    return all_datasets
1✔
135

136

137
def task_queue(mode='all', lab=None, one=None):
1✔
138
    """
139
    Query waiting jobs from the specified Lab
140
    :param mode: Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs
141
    :param lab: lab name as per Alyx, otherwise try to infer from local globus install
142
    :param one: ONE instance
143
    -------
144

145
    """
146
    if one is None:
×
147
        one = ONE(cache_rest=None)
×
148
    if lab is None:
×
149
        _logger.debug("Trying to infer lab from globus installation")
×
150
        lab = get_lab(one.alyx)
×
151
    if lab is None:
×
152
        _logger.error("No lab provided or found")
×
153
        return  # if the lab is none, this will return empty tasks each time
×
154
    data_repo = get_local_data_repository(one)
×
155
    # Filter for tasks
156
    tasks_all = one.alyx.rest('tasks', 'list', status='Waiting',
×
157
                              django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True)
158
    if mode == 'all':
×
159
        waiting_tasks = tasks_all
×
160
    else:
161
        small_jobs = []
×
162
        large_jobs = []
×
163
        for t in tasks_all:
×
164
            strmodule, strclass = t['executable'].rsplit('.', 1)
×
165
            classe = getattr(importlib.import_module(strmodule), strclass)
×
166
            job_size = classe.job_size
×
167
            if job_size == 'small':
×
168
                small_jobs.append(t)
×
169
            else:
170
                large_jobs.append(t)
×
171
    if mode == 'small':
×
172
        waiting_tasks = small_jobs
×
173
    elif mode == 'large':
×
174
        waiting_tasks = large_jobs
×
175

176
    # Order tasks by priority
177
    sorted_tasks = sorted(waiting_tasks, key=lambda d: d['priority'], reverse=True)
×
178

179
    return sorted_tasks
×
180

181

182
def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs):
1✔
183
    """
184
    Function to run a list of tasks (task dictionary from Alyx query) on a local server
185
    :param subjects_path:
186
    :param tasks_dict:
187
    :param one:
188
    :param dry:
189
    :param count: maximum number of tasks to run
190
    :param time_out: between each task, if time elapsed is greater than time out, returns (seconds)
191
    :param kwargs:
192
    :return: list of dataset dictionaries
193
    """
194
    if one is None:
1✔
195
        one = ONE(cache_rest=None)
×
196
    tstart = time.time()
1✔
197
    c = 0
1✔
198
    last_session = None
1✔
199
    all_datasets = []
1✔
200
    for tdict in tasks_dict:
1✔
201
        # if the count is reached or if the time_out has been elapsed, break the loop and return
202
        if c >= count or (time_out and time.time() - tstart > time_out):
1✔
203
            break
×
204
        # reconstruct the session local path. As many jobs belong to the same session
205
        # cache the result
206
        if last_session != tdict['session']:
1✔
207
            ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0]
1✔
208
            session_path = Path(subjects_path).joinpath(
1✔
209
                Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3)))
210
            last_session = tdict['session']
1✔
211
        if dry:
1✔
212
            print(session_path, tdict['name'])
1✔
213
        else:
214
            task, dsets = tasks.run_alyx_task(tdict=tdict, session_path=session_path,
1✔
215
                                              one=one, **kwargs)
216
            if dsets:
1✔
217
                all_datasets.extend(dsets)
1✔
218
                c += 1
1✔
219
    return all_datasets
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc