• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / ibllib / 7961675356254463

pending completion
7961675356254463

Pull #557

continuous-integration/UCL

olivier
add test
Pull Request #557: Chained protocols

718 of 718 new or added lines in 27 files covered. (100.0%)

12554 of 18072 relevant lines covered (69.47%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.1
/ibllib/pipes/base_tasks.py
1
from one.webclient import no_cache
1✔
2

3
from ibllib.pipes.tasks import Task
1✔
4
import ibllib.io.session_params as sess_params
1✔
5
from ibllib.qc.base import sign_off_dict, SIGN_OFF_CATEGORIES
1✔
6
import logging
1✔
7

8
_logger = logging.getLogger(__name__)
1✔
9

10

11
class DynamicTask(Task):
1✔
12

13
    def __init__(self, session_path, **kwargs):
1✔
14
        super().__init__(session_path, **kwargs)
1✔
15
        self.session_params = self.read_params_file()
1✔
16

17
        # TODO Which should be default?
18
        # Sync collection
19
        self.sync_collection = self.get_sync_collection(kwargs.get('sync_collection', None))
1✔
20
        # Sync type
21
        self.sync = self.get_sync(kwargs.get('sync', None))
1✔
22
        # Sync extension
23
        self.sync_ext = self.get_sync_extension(kwargs.get('sync_ext', None))
1✔
24
        # Sync namespace
25
        self.sync_namespace = self.get_sync_namespace(kwargs.get('sync_namespace', None))
1✔
26

27
    def get_sync_collection(self, sync_collection=None):
1✔
28
        return sync_collection if sync_collection else sess_params.get_sync_collection(self.session_params)
1✔
29

30
    def get_sync(self, sync=None):
1✔
31
        return sync if sync else sess_params.get_sync(self.session_params)
1✔
32

33
    def get_sync_extension(self, sync_ext=None):
1✔
34
        return sync_ext if sync_ext else sess_params.get_sync_extension(self.session_params)
1✔
35

36
    def get_sync_namespace(self, sync_namespace=None):
1✔
37
        return sync_namespace if sync_namespace else sess_params.get_sync_namespace(self.session_params)
1✔
38

39
    def get_protocol(self, protocol=None, task_collection=None):
1✔
40
        return protocol if protocol else sess_params.get_task_protocol(self.session_params, task_collection)
1✔
41

42
    def get_task_collection(self, collection=None):
1✔
43
        if not collection:
1✔
44
            collection = sess_params.get_task_collection(self.session_params)
1✔
45
        # If inferring the collection from the experiment description, assert only one returned
46
        assert collection is None or isinstance(collection, str) or len(collection) == 1
1✔
47
        return collection
1✔
48

49
    def get_device_collection(self, device, device_collection=None):
1✔
50
        return device_collection if device_collection else sess_params.get_device_collection(self.session_params, device)
1✔
51

52
    def read_params_file(self):
1✔
53
        params = sess_params.read_params(self.session_path)
1✔
54

55
        if params is None:
1✔
56
            return {}
1✔
57

58
        # TODO figure out the best way
59
        # if params is None and self.one:
60
        #     # Try to read params from alyx or try to download params file
61
        #     params = self.one.load_dataset(self.one.path2eid(self.session_path), 'params.yml')
62
        #     params = self.one.alyx.rest()
63

64
        return params
1✔
65

66

67
class BehaviourTask(DynamicTask):
1✔
68

69
    def __init__(self, session_path, **kwargs):
1✔
70
        super().__init__(session_path, **kwargs)
1✔
71

72
        self.collection = self.get_task_collection(kwargs.get('collection', None))
1✔
73
        # Task type (protocol)
74
        self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection)
1✔
75

76
        self.protocol_number = self.get_protocol_number(kwargs.get('protocol_number'), task_protocol=self.protocol)
1✔
77

78
        self.output_collection = 'alf'
1✔
79
        # Do not use kwargs.get('number', None) -- this will return None if number is 0
80
        if self.protocol_number is not None:
1✔
81
            self.output_collection += f'/task_{self.protocol_number:02}'
1✔
82

83
    def get_protocol(self, protocol=None, task_collection=None):
1✔
84
        return protocol if protocol else sess_params.get_task_protocol(self.session_params, task_collection)
1✔
85

86
    def get_task_collection(self, collection=None):
1✔
87
        if not collection:
1✔
88
            collection = sess_params.get_task_collection(self.session_params)
×
89
        # If inferring the collection from the experiment description, assert only one returned
90
        assert collection is None or isinstance(collection, str) or len(collection) == 1
1✔
91
        return collection
1✔
92

93
    def get_protocol_number(self, number=None, task_protocol=None):
1✔
94
        if number is None:  # Do not use "if not number" as that will return True if number is 0
1✔
95
            number = sess_params.get_task_protocol_number(self.session_params, task_protocol)
1✔
96
        # If inferring the number from the experiment description, assert only one returned (or something went wrong)
97
        assert number is None or isinstance(number, int)
1✔
98
        return number
1✔
99

100

101
class VideoTask(DynamicTask):
1✔
102

103
    def __init__(self, session_path, cameras, **kwargs):
1✔
104
        super().__init__(session_path, cameras=cameras, **kwargs)
1✔
105
        self.cameras = cameras
1✔
106
        self.device_collection = self.get_device_collection('cameras', kwargs.get('device_collection', 'raw_video_data'))
1✔
107
        # self.collection = self.get_task_collection(kwargs.get('collection', None))
108

109

110
class AudioTask(DynamicTask):
1✔
111

112
    def __init__(self, session_path, **kwargs):
1✔
113
        super().__init__(session_path, **kwargs)
1✔
114
        self.device_collection = self.get_device_collection('microphone', kwargs.get('device_collection', 'raw_behavior_data'))
1✔
115

116

117
class EphysTask(DynamicTask):
1✔
118

119
    def __init__(self, session_path, **kwargs):
1✔
120
        super().__init__(session_path, **kwargs)
1✔
121

122
        self.pname = self.get_pname(kwargs.get('pname', None))
1✔
123
        self.nshanks, self.pextra = self.get_nshanks(kwargs.get('nshanks', None))
1✔
124
        self.device_collection = self.get_device_collection('neuropixel', kwargs.get('device_collection', 'raw_ephys_data'))
1✔
125

126
    def get_pname(self, pname):
1✔
127
        # pname can be a list or a string
128
        pname = self.kwargs.get('pname', pname)
1✔
129

130
        return pname
1✔
131

132
    def get_nshanks(self, nshanks=None):
1✔
133
        nshanks = self.kwargs.get('nshanks', nshanks)
1✔
134
        if nshanks is not None:
1✔
135
            pextra = [chr(97 + int(shank)) for shank in range(nshanks)]
1✔
136
        else:
137
            pextra = []
1✔
138

139
        return nshanks, pextra
1✔
140

141

142
class WidefieldTask(DynamicTask):
1✔
143
    def __init__(self, session_path, **kwargs):
1✔
144
        super().__init__(session_path, **kwargs)
1✔
145

146
        self.device_collection = self.get_device_collection('widefield', kwargs.get('device_collection', 'raw_widefield_data'))
1✔
147

148

149
class RegisterRawDataTask(DynamicTask):  # TODO write test
1✔
150
    """
1✔
151
    Base register raw task.
152
    To rename files
153
     1. input and output must have the same length
154
     2. output files must have full filename
155
    """
156

157
    priority = 100
1✔
158
    job_size = 'small'
1✔
159

160
    def rename_files(self, symlink_old=False, **kwargs):
1✔
161

162
        # If no inputs are given, we don't do any renaming
163
        if len(self.input_files) == 0:
1✔
164
            return
1✔
165

166
        # Otherwise we need to make sure there is one to one correspondence for renaming files
167
        assert len(self.input_files) == len(self.output_files)
1✔
168

169
        for before, after in zip(self.input_files, self.output_files):
1✔
170
            old_file, old_collection, required = before
1✔
171
            old_path = self.session_path.joinpath(old_collection).glob(old_file)
1✔
172
            old_path = next(old_path, None)
1✔
173
            # if the file doesn't exist and it is not required we are okay to continue
174
            if not old_path and not required:
1✔
175
                continue
×
176

177
            new_file, new_collection, _ = after
1✔
178
            new_path = self.session_path.joinpath(new_collection, new_file)
1✔
179
            new_path.parent.mkdir(parents=True, exist_ok=True)
1✔
180
            old_path.replace(new_path)
1✔
181
            if symlink_old:
1✔
182
                old_path.symlink_to(new_path)
1✔
183

184
    def _run(self, **kwargs):
1✔
185
        self.rename_files(**kwargs)
1✔
186
        out_files = []
1✔
187
        n_required = 0
1✔
188
        for file_sig in self.output_files:
1✔
189
            file_name, collection, required = file_sig
1✔
190
            n_required += required
1✔
191
            file_path = self.session_path.joinpath(collection).glob(file_name)
1✔
192
            file_path = next(file_path, None)
1✔
193
            if not file_path and not required:
1✔
194
                continue
1✔
195
            elif not file_path and required:
1✔
196
                _logger.error(f'expected {file_sig} missing')
×
197
            else:
198
                out_files.append(file_path)
1✔
199

200
        if len(out_files) < n_required:
1✔
201
            self.status = -1
×
202

203
        return out_files
1✔
204

205

206
class ExperimentDescriptionRegisterRaw(RegisterRawDataTask):
1✔
207
    """dict of list: custom sign off keys corresponding to specific devices"""
1✔
208
    sign_off_categories = SIGN_OFF_CATEGORIES
1✔
209

210
    @property
1✔
211
    def signature(self):
1✔
212
        signature = {
1✔
213
            'input_files': [],
214
            'output_files': [('*experiment.description.yaml', '', True)]
215
        }
216
        return signature
1✔
217

218
    def _run(self, **kwargs):
1✔
219
        # Register experiment description file
220
        out_files = super(ExperimentDescriptionRegisterRaw, self)._run(**kwargs)
1✔
221
        if not self.one.offline and self.status == 0:
1✔
222
            with no_cache(self.one.alyx):  # Ensure we don't load the cached JSON response
1✔
223
                eid = self.one.path2eid(self.session_path, query_type='remote')
1✔
224
            exp_dec = sess_params.read_params(out_files[0])
1✔
225
            data = sign_off_dict(exp_dec, sign_off_categories=self.sign_off_categories)
1✔
226
            self.one.alyx.json_field_update('sessions', eid, data=data)
1✔
227
        return out_files
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc