• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / ibllib / 1761696499260742

05 Oct 2023 09:46AM UTC coverage: 55.27% (-1.4%) from 56.628%
1761696499260742

Pull #655

continuous-integration/UCL

bimac
add @sleepless decorator
Pull Request #655: add @sleepless decorator

21 of 21 new or added lines in 1 file covered. (100.0%)

10330 of 18690 relevant lines covered (55.27%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.73
/ibllib/qc/base.py
1
import logging
1✔
2
from abc import abstractmethod
1✔
3
from pathlib import Path
1✔
4
from itertools import chain
1✔
5

6
import numpy as np
1✔
7

8
from one.api import ONE
1✔
9
from one.alf.spec import is_session_path, is_uuid_string
1✔
10

11
"""dict: custom sign off categories"""
1✔
12
SIGN_OFF_CATEGORIES = {'neuropixel': ['raw', 'spike_sorting', 'alignment']}
1✔
13

14
"""dict: Map for comparing QC outcomes"""
1✔
15
CRITERIA = {'CRITICAL': 4,
1✔
16
            'FAIL': 3,
17
            'WARNING': 2,
18
            'PASS': 1,
19
            'NOT_SET': 0
20
            }
21

22

23
class QC:
1✔
24
    """A base class for data quality control"""
1✔
25

26
    def __init__(self, endpoint_id, one=None, log=None, endpoint='sessions'):
1✔
27
        """
28
        :param endpoint_id: Eid for endpoint. If using sessions can also be a session path
29
        :param log: A logging.Logger instance, if None the 'ibllib' logger is used
30
        :param one: An ONE instance for fetching and setting the QC on Alyx
31
        :param endpoint: The endpoint name to apply qc to. Default is 'sessions'
32
        """
33
        self.one = one or ONE()
1✔
34
        self.log = log or logging.getLogger(__name__)
1✔
35
        if endpoint == 'sessions':
1✔
36
            self.endpoint = endpoint
1✔
37
            self._set_eid_or_path(endpoint_id)
1✔
38
            self.json = False
1✔
39
        else:
40
            self.endpoint = endpoint
×
41
            self._confirm_endpoint_id(endpoint_id)
×
42
            self.json = True
×
43

44
        # Ensure outcome attribute matches Alyx record
45
        updatable = self.eid and self.one and not self.one.offline
1✔
46
        self._outcome = self.update('NOT_SET', namespace='') if updatable else 'NOT_SET'
1✔
47
        self.log.debug(f'Current QC status is {self.outcome}')
1✔
48

49
    @abstractmethod
1✔
50
    def run(self):
1✔
51
        """Run the QC tests and return the outcome
52
        :return: One of "CRITICAL", "FAIL", "WARNING" or "PASS"
53
        """
54
        pass
×
55

56
    @abstractmethod
1✔
57
    def load_data(self):
1✔
58
        """Load the data required to compute the QC
59
        Subclasses may implement this for loading raw data
60
        """
61
        pass
×
62

63
    @property
1✔
64
    def outcome(self):
1✔
65
        return self._outcome
1✔
66

67
    @outcome.setter
1✔
68
    def outcome(self, value):
1✔
69
        value = value.upper()  # Ensure outcome is uppercase
1✔
70
        if value not in CRITERIA:
1✔
71
            raise ValueError('Invalid outcome; must be one of ' + ', '.join(CRITERIA.keys()))
×
72
        if CRITERIA[self._outcome] < CRITERIA[value]:
1✔
73
            self._outcome = value
×
74

75
    @staticmethod
1✔
76
    def overall_outcome(outcomes: iter, agg=max) -> str:
1✔
77
        """
78
        Given an iterable of QC outcomes, returns the overall (i.e. worst) outcome.
79

80
        Example:
81
          QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL'])  # Returns 'FAIL'
82

83
        :param outcomes: An iterable of QC outcomes
84
        :param agg: outcome code aggregate function, default is max (i.e. worst)
85
        :return: The overall outcome string
86
        """
87
        outcomes = filter(lambda x: x or (isinstance(x, float) and not np.isnan(x)), outcomes)
1✔
88
        code = agg(CRITERIA.get(x, 0) if isinstance(x, str) else x for x in outcomes)
1✔
89
        return next(k for k, v in CRITERIA.items() if v == code)
1✔
90

91
    @staticmethod
1✔
92
    def code_to_outcome(code: int) -> str:
1✔
93
        """
94
        Given an outcome id, returns the corresponding string.
95

96
        Example:
97
          QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL'])  # Returns 'FAIL'
98

99
        :param code: The outcome id
100
        :return: The overall outcome string
101
        """
102
        return next(k for k, v in CRITERIA.items() if v == code)
×
103

104
    def _set_eid_or_path(self, session_path_or_eid):
1✔
105
        """Parse a given eID or session path
106
        If a session UUID is given, resolves and stores the local path and vice versa
107
        :param session_path_or_eid: A session eid or path
108
        :return:
109
        """
110
        self.eid = None
1✔
111
        if is_uuid_string(str(session_path_or_eid)):
1✔
112
            self.eid = session_path_or_eid
1✔
113
            # Try to set session_path if data is found locally
114
            self.session_path = self.one.eid2path(self.eid)
1✔
115
        elif is_session_path(session_path_or_eid):
1✔
116
            self.session_path = Path(session_path_or_eid)
1✔
117
            if self.one is not None:
1✔
118
                self.eid = self.one.path2eid(self.session_path)
1✔
119
                if not self.eid:
1✔
120
                    self.log.warning('Failed to determine eID from session path')
1✔
121
        else:
122
            self.log.error('Cannot run QC: an experiment uuid or session path is required')
×
123
            raise ValueError("'session' must be a valid session path or uuid")
×
124

125
    def _confirm_endpoint_id(self, endpoint_id):
1✔
126
        # Have as read for now since 'list' isn't working
127
        target_obj = self.one.alyx.get(f'/{self.endpoint}/{endpoint_id}', clobber=True) or None
×
128
        if target_obj:
×
129
            self.eid = endpoint_id
×
130
            json_field = target_obj.get('json')
×
131
            if not json_field:
×
132
                self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid,
×
133
                                                field_name='json', data={'qc': 'NOT_SET',
134
                                                                         'extended_qc': {}})
135
            elif not json_field.get('qc', None):
×
136
                self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid,
×
137
                                                field_name='json', data={'qc': 'NOT_SET',
138
                                                                         'extended_qc': {}})
139
        else:
140
            self.log.error('Cannot run QC: endpoint id is not recognised')
×
141
            raise ValueError("'endpoint_id' must be a valid uuid")
×
142

143
    def update(self, outcome=None, namespace='experimenter', override=False):
1✔
144
        """Update the qc field in Alyx
145
        Updates the 'qc' field in Alyx if the new QC outcome is worse than the current value.
146
        :param outcome: A string; one of "CRITICAL", "FAIL", "WARNING", "PASS" or "NOT_SET"
147
        :param namespace: The extended QC key specifying the type of QC associated with the outcome
148
        :param override: If True the QC field is updated even if new value is better than previous
149
        :return: The current QC outcome str on Alyx
150

151
        Example:
152
            qc = QC('path/to/session')
153
            qc.update('PASS')  # Update current QC field to 'PASS' if not set
154
        """
155
        assert self.one, "instance of one should be provided"
×
156
        if self.one.offline:
×
157
            self.log.warning('Running on OneOffline instance, unable to update remote QC')
×
158
            return
×
159
        outcome = outcome or self.outcome
×
160
        outcome = outcome.upper()  # Ensure outcome is uppercase
×
161
        if outcome not in CRITERIA:
×
162
            raise ValueError('Invalid outcome; must be one of ' + ', '.join(CRITERIA.keys()))
×
163
        assert self.eid, 'Unable to update Alyx; eID not set'
×
164
        if namespace:  # Record in extended qc
×
165
            self.update_extended_qc({namespace: outcome})
×
166
        details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True)
×
167
        current_status = (details['json'] if self.json else details)['qc']
×
168

169
        if CRITERIA[current_status] < CRITERIA[outcome] or override:
×
170
            r = self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid,
×
171
                                                field_name='json', data={'qc': outcome}) \
172
                if self.json else self.one.alyx.rest(self.endpoint, 'partial_update', id=self.eid,
173
                                                     data={'qc': outcome})
174

175
            current_status = r['qc'].upper()
×
176
            assert current_status == outcome, 'Failed to update session QC'
×
177
            self.log.info(f'QC field successfully updated to {outcome} for {self.endpoint[:-1]} '
×
178
                          f'{self.eid}')
179
        self._outcome = current_status
×
180
        return self.outcome
×
181

182
    def update_extended_qc(self, data):
1✔
183
        """Update the extended_qc field in Alyx
184
        Subclasses should chain a call to this.
185
        :param data: a dict of qc tests and their outcomes, typically a value between 0. and 1.
186
        :return: the updated extended_qc field
187
        """
188
        assert self.eid, 'Unable to update Alyx; eID not set'
×
189
        assert self.one, "instance of one should be provided"
×
190
        if self.one.offline:
×
191
            self.log.warning('Running on OneOffline instance, unable to update remote QC')
×
192
            return
×
193

194
        # Ensure None instead of NaNs
195
        for k, v in data.items():
×
196
            if v is not None and not isinstance(v, str):
×
197
                if isinstance(v, tuple):
×
198
                    data[k] = tuple(None if not isinstance(i, str) and np.isnan(i) else i for i in v)
×
199
                else:
200
                    data[k] = None if np.isnan(v).all() else v
×
201

202
        details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True)
×
203
        if self.json:
×
204
            extended_qc = details['json']['extended_qc'] or {}
×
205
            extended_qc.update(data)
×
206
            extended_qc_dict = {'extended_qc': extended_qc}
×
207
            out = self.one.alyx.json_field_update(
×
208
                endpoint=self.endpoint, uuid=self.eid, field_name='json', data=extended_qc_dict)
209
        else:
210
            extended_qc = details['extended_qc'] or {}
×
211
            extended_qc.update(data)
×
212
            out = self.one.alyx.json_field_update(
×
213
                endpoint=self.endpoint, uuid=self.eid, field_name='extended_qc', data=extended_qc)
214

215
        self.log.info(f'Extended QC field successfully updated for {self.endpoint[:-1]} '
×
216
                      f'{self.eid}')
217
        return out
×
218

219
    def compute_outcome_from_extended_qc(self) -> str:
1✔
220
        """
221
        Returns the session outcome computed from aggregating the extended QC
222
        """
223
        details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True)
×
224
        extended_qc = details['json']['extended_qc'] if self.json else details['extended_qc']
×
225
        return self.overall_outcome(v for k, v in extended_qc or {} if k[0] != '_')
×
226

227

228
def sign_off_dict(exp_dec, sign_off_categories=None):
1✔
229
    """
230
    Creates a dict containing 'sign off' keys for each device and task protocol in the provided
231
    experiment description.
232

233
    Parameters
234
    ----------
235
    exp_dec : dict
236
        A loaded experiment description file.
237
    sign_off_categories : dict of list
238
        A dictionary of custom JSON keys for a given device in the acquisition description file.
239

240
    Returns
241
    -------
242
    dict of dict
243
        The sign off dictionary with the main key 'sign_off_checklist' containing keys for each
244
        device and task protocol.
245
    """
246
    # Note this assumes devices each contain a dict of dicts
247
    # e.g. {'devices': {'DAQ_1': {'device_1': {}, 'device_2': {}},}
248
    sign_off_categories = sign_off_categories or SIGN_OFF_CATEGORIES
×
249
    sign_off_keys = set()
×
250
    for k, v in exp_dec.get('devices', {}).items():
×
251
        assert isinstance(v, dict) and v
×
252
        if len(v.keys()) == 1 and next(iter(v.keys())) == k:
×
253
            if k in sign_off_categories:
×
254
                for subkey in sign_off_categories[k]:
×
255
                    sign_off_keys.add(f'{k}_{subkey}')
×
256
            else:
257
                sign_off_keys.add(k)
×
258
        else:
259
            for kk in v.keys():
×
260
                if k in sign_off_categories:
×
261
                    for subkey in sign_off_categories[k]:
×
262
                        sign_off_keys.add(f'{k}_{subkey}_{kk}')
×
263
                else:
264
                    sign_off_keys.add(f'{k}_{kk}')
×
265

266
    # Add keys for each protocol
267
    for i, v in enumerate(chain(*map(dict.keys, exp_dec.get('tasks', [])))):
×
268
        sign_off_keys.add(f'{v}_{i:02}')
×
269

270
    return {'sign_off_checklist': dict.fromkeys(map(lambda x: f'_{x}', sign_off_keys))}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc