• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 9031936551

10 May 2024 12:05PM UTC coverage: 48.538% (+1.7%) from 46.79%
9031936551

Pull #643

github

53c3e3
web-flow
Merge 3c8214f78 into ec2d8e4fe
Pull Request #643: 8.19.0

377 of 1073 new or added lines in 38 files covered. (35.14%)

977 existing lines in 19 files now uncovered.

3253 of 6702 relevant lines covered (48.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.22
/iblrig/hardware_validation.py
1
import logging
2✔
2
import re
2✔
3
from abc import ABC, abstractmethod
2✔
4
from collections.abc import Generator
2✔
5
from dataclasses import dataclass
2✔
6
from datetime import date
2✔
7
from enum import IntEnum
2✔
8
from inspect import isabstract
2✔
9
from math import isclose
2✔
10
from struct import unpack
2✔
11
from typing import Any, cast
2✔
12

13
import numpy as np
2✔
14
import sounddevice
2✔
15
import usb
2✔
16
from dateutil.relativedelta import relativedelta
2✔
17
from serial import Serial, SerialException
2✔
18
from serial.tools import list_ports
2✔
19
from serial.tools.list_ports_common import ListPortInfo
2✔
20

21
from iblrig.base_tasks import BpodMixin, SoundMixin
2✔
22
from iblrig.constants import BASE_PATH, HAS_PYSPIN, HAS_SPINNAKER, IS_GIT
2✔
23
from iblrig.hardware import Bpod
2✔
24
from iblrig.path_helper import load_pydantic_yaml
2✔
25
from iblrig.pydantic_definitions import HardwareSettings, RigSettings
2✔
26
from iblrig.serial_singleton import SerialSingleton, filter_ports
2✔
27
from iblrig.tools import ANSI, get_inheritors, internet_available
2✔
28
from iblrig.version_management import get_branch, is_dirty
2✔
29
from pybpodapi.bpod_modules.bpod_module import BpodModule
2✔
30
from pybpodapi.state_machine import StateMachine
2✔
31

32
log = logging.getLogger(__name__)
2✔
33

34

35
class Status(IntEnum):
2✔
36
    """Possible status codes of hardware validations"""
37

38
    PEND = 0  # Test pending
2✔
39
    SKIP = 1  # Test not applicable (e.g., device not present)
2✔
40
    PASS = 2  # Test passed
2✔
41
    WARN = 3  # Test passed with warning
2✔
42
    INFO = 4  # Secondary information yielded from tests (e.g., firmware version)
2✔
43
    FAIL = 5  # Test failed
2✔
44

45

46
@dataclass
2✔
47
class Result:
2✔
48
    """Dataclass holding the results of a single validation"""
49

50
    status: Status
2✔
51
    message: str
2✔
52
    ext_message: str | None = None
2✔
53
    solution: str | None = None
2✔
54
    url: str | None = None
2✔
55
    exception: Exception | None = None
2✔
56

57

58
class ValidateHardwareError(Exception):
2✔
59
    def __init__(self, results: Result):
2✔
60
        super().__init__(results.message)
×
61
        self.results = results
×
62

63

64
class Validator(ABC):
2✔
65
    log_results: bool = True
2✔
66
    raise_fail_as_exception: bool = False
2✔
67
    interactive: bool
2✔
68
    iblrig_settings: RigSettings
2✔
69
    hardware_settings: HardwareSettings
2✔
70
    _name: str | None = None
2✔
71

72
    def __init__(
2✔
73
        self,
74
        iblrig_settings: RigSettings | None = None,
75
        hardware_settings: HardwareSettings | None = None,
76
        interactive: bool = False,
77
    ):
78
        self.iblrig_settings = iblrig_settings or load_pydantic_yaml(RigSettings)
2✔
79
        self.hardware_settings = hardware_settings or load_pydantic_yaml(HardwareSettings)
2✔
80
        self.interactive = interactive
2✔
81

82
    @property
2✔
83
    def name(self) -> str:
2✔
NEW
84
        return getattr(self, '_name', self.__class__.__name__)
×
85

86
    @abstractmethod
2✔
87
    def _run(self, *args, **kwargs) -> Generator[Result, None, bool]: ...
2✔
88

89
    def run(self, *args, **kwargs) -> Generator[Result, None, bool]:
2✔
NEW
90
        success = yield from self._run(*args, **kwargs)
×
NEW
91
        return success
×
92

93
    def _get_bpod(self) -> Generator[Result, None, Bpod | None]:
2✔
NEW
94
        if self.hardware_settings.device_bpod.COM_BPOD is None:
×
NEW
95
            yield Result(Status.INFO, f'Cannot complete validation of {self.name} without Bpod')
×
NEW
96
            return None
×
NEW
97
        try:
×
NEW
98
            return Bpod(self.hardware_settings.device_bpod.COM_BPOD, skip_initialization=True)
×
NEW
99
        except Exception as e:
×
NEW
100
            yield Result(Status.FAIL, f'Cannot complete validation of {self.name}: connection to Bpod failed', exception=e)
×
NEW
101
            return None
×
102

103
    def _get_module(self, module_name: str, bpod: Bpod | None = None) -> Generator[Result, None, BpodModule | None]:
2✔
NEW
104
        if bpod is None:
×
NEW
105
            bpod = yield from self._get_bpod()
×
NEW
106
        if bpod is None:
×
NEW
107
            return None
×
108

NEW
109
        module = None if bpod.modules is None else next((m for m in bpod.modules if m.name.startswith(module_name)), None)
×
110

NEW
111
        if module is not None:
×
NEW
112
            yield Result(Status.PASS, f'{self.name} is connected to Bpod on module port #{module.serial_port}')
×
NEW
113
            yield Result(Status.INFO, f'Firmware Version: {module.firmware_version}')
×
NEW
114
            return module
×
115
        else:
NEW
116
            yield Result(
×
117
                Status.FAIL,
118
                f"{self.name} is not connected to Bpod's module port",
119
                solution=f"Connect {self.name} to one of Bpod's module ports",
120
            )
NEW
121
            return None
×
122

123
    def process(self, results: Result) -> Result:
2✔
124
        if self.log_results:
×
125
            match results.status:
×
NEW
126
                case Status.PASS:
×
127
                    log_level = logging.INFO
×
NEW
128
                case Status.INFO:
×
129
                    log_level = logging.INFO
×
NEW
130
                case Status.FAIL:
×
131
                    log_level = logging.CRITICAL
×
132
                case _:
×
NEW
133
                    log_level = logging.CRITICAL
×
NEW
134
            log.log(log_level, results.message)
×
135

NEW
136
        if self.raise_fail_as_exception and results.status == Status.FAIL:
×
137
            if results.exception is not None:
×
NEW
138
                raise ValidateHardwareError(results) from results.exception
×
139
            else:
NEW
140
                raise ValidateHardwareError(results)
×
141

NEW
142
        return results
×
143

144

145
class ValidatorSerial(Validator):
2✔
146
    port_properties: dict[str, Any] = {}
2✔
147
    serial_queries: None | dict[tuple[object, int], bytes] = None
2✔
148

149
    def __init__(self, *args, **kwargs):
2✔
150
        super().__init__(*args, **kwargs)
2✔
151

152
    @property
2✔
153
    @abstractmethod
2✔
154
    def port(self) -> str | None: ...
2✔
155

156
    @property
2✔
157
    def port_info(self) -> ListPortInfo | None:
2✔
NEW
158
        return next(list_ports.grep(self.port), None) if self.port is not None else None
×
159

160
    def _run(self):
2✔
161
        if self.port is None:
×
NEW
162
            yield Result(Status.SKIP, f'No serial port defined for {self.name}')
×
NEW
163
            return False
×
164
        elif next((p for p in list_ports.comports() if p.device == self.port), None) is None:
×
NEW
165
            yield Result(
×
166
                Status.FAIL,
167
                f'{self.port} is not a valid serial port',
168
                solution='Check serial port setting in hardware_settings.yaml',
169
            )
NEW
170
            return False
×
171
        else:
172
            try:
×
173
                Serial(self.port, timeout=1).close()
×
NEW
174
                yield Result(Status.PASS, f'Serial device on {self.port} can be connected to')
×
NEW
175
                yield Result(
×
176
                    Status.INFO,
177
                    f'USB ID: {self.port_info.vid:04X}:{self.port_info.pid:04X}, '
178
                    f'Serial Number: {self.port_info.serial_number}',
179
                )
180
            except SerialException as e:
×
NEW
181
                yield Result(
×
182
                    Status.FAIL,
183
                    f'Serial device on {self.port} cannot be connected to',
184
                    solution='Try power-cycling the device',
185
                    exception=e,
186
                )
NEW
187
                return False
×
188

189
        # first, test for properties of the serial port without opening the latter (VID, PID, etc)
NEW
190
        passed = (
×
191
            self.port in filter_ports(**self.port_properties) if getattr(self, 'port_properties', None) is not None else False
192
        )
193

194
        # query the devices for characteristic responses
NEW
195
        if passed and getattr(self, 'serial_queries', None) is not None:
×
196
            with SerialSingleton(self.port, timeout=1) as ser:
×
197
                for query, regex_pattern in self.serial_queries.items():
×
198
                    return_string = ser.query(*query)
×
199
                    ser.flush()
×
200
                    if not (passed := bool(re.search(regex_pattern, return_string))):
×
201
                        break
×
202
        if passed:
×
NEW
203
            yield Result(Status.PASS, f'Serial device positively identified as {self.name}')
×
NEW
204
            return True
×
205
        else:
NEW
206
            yield Result(
×
207
                Status.FAIL,
208
                f'Serial device on {self.port} does NOT seem to be a {self.name}',
209
                solution='Check serial port setting in hardware_settings.yaml',
210
            )
NEW
211
            return False
×
212

213

214
class ValidatorRotaryEncoderModule(ValidatorSerial):
2✔
215
    _name = 'Bpod Rotary Encoder Module'
2✔
216
    port_properties = {'vid': 0x16C0}
2✔
217
    serial_queries = {(b'Q', 2): b'^..$', (b'P00', 1): b'\x01'}
2✔
218

219
    @property
2✔
220
    def port(self):
2✔
NEW
221
        return self.hardware_settings.device_rotary_encoder.COM_ROTARY_ENCODER
×
222

223
    def _run(self):
2✔
224
        # invoke ValidateSerialDevice._run()
NEW
225
        success = yield from super()._run()
×
NEW
226
        if not success:
×
NEW
227
            return False
×
228

229
        # obtain hardware version
NEW
230
        with SerialSingleton(self.port, timeout=0.1) as ser:
×
NEW
231
            v = '1.x' if ser.query(b'Ix', 1) == b'\x01' else '2+'
×
NEW
232
        yield Result(Status.INFO, f'Hardware Version: {v}')
×
233

234
        # try to get Bpod
NEW
235
        bpod = yield from self._get_bpod()
×
NEW
236
        if not bpod:
×
NEW
237
            return False
×
238

239
        # try to get Bpod module
NEW
240
        module = yield from self._get_module('RotaryEncoder', bpod)
×
NEW
241
        if not module:
×
NEW
242
            return False
×
243

244
        # log_fun('info', f'firmware version: {bpod.modules[0].firmware_version}')
245
        #
246
        # s.write(b'Z')
247
        # p = np.frombuffer(query(s, b'Q', 2), dtype=np.int16)[0]
248
        # log_fun('warn', "please move the wheel to the left (animal's POV) by a quarter turn")
249
        # while np.abs(p) < 200:
250
        #     p = np.frombuffer(query(s, b'Q', 2), dtype=np.int16)[0]
251
        # if p > 0:
252
        #     log_fun('fail', 'Rotary encoder seems to be wired incorrectly - try swapping A and B', last=True)
253
        # else:
254
        #     log_fun('pass', 'rotary encoder is wired correctly', last=True)
255
        # s.close()
256

257

258
# class ValidatorScreen(Validator):
259
#     device_name = 'Screen'
260
#
261
#     def _run(self):
262
#         pass
263
#         # if os.name == 'nt':
264
#         #     import ctypes
265
#         #
266
#         #     from win32api import EnumDisplayMonitors, EnumDisplaySettingsEx, GetMonitorInfo
267
#         #
268
#         #     display_idx = self.hardware_settings.device_screen.DISPLAY_IDX
269
#         #     monitors = EnumDisplayMonitors()
270
#         #     monitor = monitors[display_idx]
271
#         #     display_handle = monitor[0]
272
#         #     scale_factor = ctypes.windll.shcore.GetScaleFactorForDevice(display_idx)
273
#         #     display_info = GetMonitorInfo(display_handle)
274
#         #     display_settings = EnumDisplaySettingsEx(display_info['Device'])
275
#         #     # TODO: Implementation ...
276

277

278
class ValidatorAmbientModule(Validator):
2✔
279
    _name = 'Bpod Ambient Module'
2✔
280

281
    def _run(self):
2✔
282
        # yield Bpod's connection status
NEW
283
        bpod = yield from self._get_bpod()
×
NEW
284
        if bpod is None:
×
NEW
285
            return False
×
286

287
        # yield module's connection status
NEW
288
        module = yield from self._get_module('AmbientModule', bpod)
×
NEW
289
        if module is None:
×
NEW
290
            return False
×
291

292
        # yield sensor values
NEW
293
        module.start_module_relay()
×
NEW
294
        bpod.bpod_modules.module_write(module, 'R')
×
NEW
295
        (t, p, h) = unpack('3f', bytes(bpod.bpod_modules.module_read(module, 12)))
×
NEW
296
        module.stop_module_relay()
×
NEW
297
        yield Result(Status.INFO, f'Temperature: {t:.1f} °C')
×
NEW
298
        yield Result(Status.INFO, f'Air pressure: {p / 100:.1f} mbar')
×
NEW
299
        yield Result(Status.INFO, f'Rel. humidity: {h:.1f}%')
×
NEW
300
        return True
×
301

302

303
class ValidatorBpod(ValidatorSerial):
2✔
304
    _name = 'Bpod'
2✔
305
    port_properties = {'vid': 0x16C0}
2✔
306
    serial_queries = {(b'6', 1): b'5'}
2✔
307

308
    @property
2✔
309
    def port(self):
2✔
NEW
310
        return self.hardware_settings.device_bpod.COM_BPOD
×
311

312
    def _run(self):
2✔
313
        # close existing Bpod singleton
NEW
314
        if (bpod := Bpod._instances.get(self.hardware_settings.device_bpod.COM_BPOD, None)) is not None:  # noqa
×
NEW
315
            bpod.close()
×
316

317
        # invoke ValidateSerialDevice._run()
NEW
318
        success = yield from super()._run()
×
NEW
319
        if not success:
×
NEW
320
            return False
×
321

322
        # check hardware and firmware version
NEW
323
        with SerialSingleton(self.hardware_settings.device_bpod.COM_BPOD) as ser:
×
NEW
324
            v_major, machine_type = ser.query(b'F', '<2H')
×
NEW
325
            firmware_version = (v_major, ser.query(b'f', '<H')[0] if v_major > 22 else 0)
×
NEW
326
            machine_str = {1: 'v0.5', 2: 'r07+', 3: 'r2.0-2.5', 4: '2+ r1.0'}[machine_type]
×
NEW
327
            machine_str.join(f", PCB revision{ser.query(b'v', '<B')[0]}" if v_major > 22 else '')
×
NEW
328
        yield Result(Status.INFO, f'Hardware version: {machine_str}')
×
NEW
329
        yield Result(Status.INFO, f'Firmware version: {firmware_version[0]}.{firmware_version[1]}')
×
NEW
330
        if firmware_version[0] > 22:
×
NEW
331
            yield Result(
×
332
                Status.FAIL,
333
                'Firmware version greater than 22 are not supported by IBLRIG',
334
                solution='Downgrade the Bpod' 's firmware to version 22',
335
            )
NEW
336
            return False
×
337

338
        # try to connect to Bpod
NEW
339
        try:
×
NEW
340
            bpod = Bpod(self.hardware_settings.device_bpod.COM_BPOD, skip_initialization=False)
×
NEW
341
            yield Result(Status.PASS, 'Successfully connected to Bpod using pybpod')
×
NEW
342
        except Exception as e:
×
NEW
343
            yield Result(
×
344
                Status.FAIL, 'Could not connect to Bpod using pybpod', solution='Try power-cycling the Bpod', exception=e
345
            )
NEW
346
            return False
×
347

348
        # return connected modules
NEW
349
        for module in bpod.modules:
×
NEW
350
            if module.connected:
×
NEW
351
                yield Result(Status.INFO, f'Module on port #{module.serial_port}: "{module.name}"')
×
NEW
352
        return True
×
353

354

355
class ValidatorCamera(Validator):
2✔
356
    _name = 'Camera'
2✔
357

358
    def _run(self):
2✔
NEW
359
        if self.hardware_settings.device_cameras is None or (
×
360
            isinstance(self.hardware_settings.device_cameras, dict) and len(self.hardware_settings.device_cameras) == 0
361
        ):
NEW
362
            yield Result(Status.SKIP, 'No cameras defined in hardware_settings.yaml - skipping validation')
×
NEW
363
            return False
×
364

NEW
365
        if HAS_SPINNAKER:
×
NEW
366
            yield Result(Status.PASS, 'Spinnaker SDK is installed')
×
367
        else:
NEW
368
            yield Result(
×
369
                Status.WARN, 'Spinnaker SDK is not installed', solution='Use install_spinnaker command to install Spinnaker SDK'
370
            )
371

NEW
372
        if HAS_PYSPIN:
×
NEW
373
            yield Result(Status.PASS, 'PySpin is installed')
×
374
        else:
NEW
375
            yield Result(Status.WARN, 'PySpin is not installed', solution='Use install_pyspin command to install PySpin')
×
376

NEW
377
        if HAS_SPINNAKER and HAS_PYSPIN:
×
NEW
378
            from iblrig.video_pyspin import Cameras, enable_camera_trigger
×
379

NEW
380
            with Cameras() as cameras:
×
NEW
381
                if len(cameras) == 0:
×
NEW
382
                    yield Result(
×
383
                        Status.FAIL,
384
                        'Could not find a camera connected to the computer',
385
                        solution='Connect a camera on one of the computers USB ports',
386
                    )
NEW
387
                    return False
×
388
                else:
NEW
389
                    yield Result(
×
390
                        Status.PASS, f'Found {len(cameras)} camera{"s" if len(cameras) > 1 else ""} connected to the computer'
391
                    )
NEW
392
                    for idx in range(len(cameras)):
×
NEW
393
                        yield Result(
×
394
                            Status.INFO,
395
                            f'Camera {idx}: {cameras[idx].DeviceModelName.ToString()}, '
396
                            f'Serial #{cameras[idx].DeviceID.ToString()}',
397
                        )
NEW
398
                        enable_camera_trigger(enable=False, camera=cameras[idx])
×
399

400
        # yield Bpod's connection status
NEW
401
        bpod = yield from self._get_bpod()
×
NEW
402
        if bpod is None:
×
NEW
403
            return False
×
404

NEW
405
        sma = StateMachine(bpod)
×
NEW
406
        sma.add_state(state_name='collect', state_timer=0.2, state_change_conditions={'Tup': 'exit'})
×
NEW
407
        bpod.send_state_machine(sma)
×
NEW
408
        bpod.run_state_machine(sma)
×
NEW
409
        triggers = [i.host_timestamp for i in bpod.session.current_trial.events_occurrences if i.content == 'Port1In']
×
NEW
410
        if len(triggers) == 0:
×
NEW
411
            yield Result(
×
412
                Status.FAIL,
413
                "No TTL detected on Bpod's behavior port #1",
414
                solution='Check the wiring between camera and valve driver board and make sure the latter is connected '
415
                "to Bpod's behavior port #1",
416
            )
NEW
417
            return False
×
418
        else:
NEW
419
            yield Result(Status.PASS, "Detected camera TTL on Bpod's behavior port #1")
×
NEW
420
            trigger_rate = np.mean(1 / np.diff(triggers))
×
NEW
421
            target_rate = 30
×
NEW
422
            if isclose(trigger_rate, target_rate, rel_tol=0.1):
×
NEW
423
                yield Result(Status.PASS, f'Measured TTL rate: {trigger_rate:.1f} Hz')
×
424
            else:
NEW
425
                yield Result(Status.WARN, f'Measured TTL rate: {trigger_rate:.1f} Hz (expecting {target_rate} Hz)')
×
NEW
426
        return True
×
427

428

429
class ValidatorAlyx(Validator):
2✔
430
    _name = 'Alyx'
2✔
431

432
    def _run(self):
2✔
433
        # Validate ALYX_URL
NEW
434
        if self.iblrig_settings.ALYX_URL is None:
×
NEW
435
            yield Result(Status.SKIP, 'ALYX_URL has not been set in hardware_settings.yaml - skipping validation')
×
NEW
436
            raise StopIteration(False)
×
NEW
437
        elif not internet_available(timeout=2, force_update=True):
×
NEW
438
            yield Result(
×
439
                Status.FAIL, f'Cannot connect to {self.iblrig_settings.ALYX_URL.host}', solution='Check your Internet connection'
440
            )
NEW
441
            return False
×
NEW
442
        elif not internet_available(host=self.iblrig_settings.ALYX_URL.host, port=443, timeout=2, force_update=True):
×
NEW
443
            yield Result(
×
444
                Status.FAIL,
445
                f'Cannot connect to {self.iblrig_settings.ALYX_URL.host}',
446
                solution='Check ALYX_URL in hardware_settings.yaml and make sure that your computer is allowed to connect',
447
            )
NEW
448
            return False
×
449
        else:
NEW
450
            yield Result(Status.PASS, f'{self.iblrig_settings.ALYX_URL.host} can be connected to')
×
451

452
        # Validate ALYX_LAB
NEW
453
        if self.iblrig_settings.ALYX_LAB is None:
×
NEW
454
            yield Result(Status.FAIL, 'ALYX_LAB has not been set', solution='Set ALYX_LAB in hardware_settings.yaml')
×
NEW
455
        return True
×
456

457

458
class ValidatorValve(Validator):
2✔
459
    _name = 'Valve'
2✔
460

461
    def _run(self):
2✔
NEW
462
        calibration_date = self.hardware_settings.device_valve.WATER_CALIBRATION_DATE
×
NEW
463
        today = date.today()
×
NEW
464
        delta_warn = relativedelta(months=1)
×
NEW
465
        delta_fail = relativedelta(months=3)
×
NEW
466
        days_passed = (today - calibration_date).days
×
NEW
467
        if calibration_date > date.today():
×
NEW
468
            yield Result(Status.FAIL, 'Date of last valve calibration is in the future', solution='Calibrate valve')
×
NEW
469
        elif calibration_date + delta_warn < today:
×
NEW
470
            yield Result(Status.WARN, f'Valve has not been calibrated in {days_passed} days', solution='Calibrate valve')
×
NEW
471
        elif calibration_date + delta_fail < date.today():
×
NEW
472
            yield Result(Status.FAIL, f'Valve has not been calibrated in {days_passed} days', solution='Calibrate valve')
×
NEW
473
        elif days_passed > 1:
×
NEW
474
            yield Result(Status.PASS, f'Valve has been calibrated {days_passed} days ago')
×
475
        else:
NEW
476
            yield Result(Status.PASS, f'Valve has been calibrated {"yesterday" if days_passed==1 else "today"}')
×
477

478

479
class ValidatorMic(Validator):
2✔
480
    _name = 'Microphone'
2✔
481

482
    def _run(self):
2✔
NEW
483
        if self.hardware_settings.device_microphone is None:
×
NEW
484
            yield Result(Status.SKIP, 'No workflow defined for microphone')
×
NEW
485
            return False
×
486

NEW
487
        sounddevice._terminate()
×
NEW
488
        sounddevice._initialize()
×
489

NEW
490
        devices = [d for d in sounddevice.query_devices() if 'UltraMic 200K' in d.get('name', '')]
×
NEW
491
        if len(devices) > 0:
×
NEW
492
            yield Result(Status.PASS, 'Found UltraMic 200K microphone')
×
NEW
493
            return True
×
494
        else:
NEW
495
            yield Result(
×
496
                Status.FAIL,
497
                'Could not find UltraMic 200K microphone',
498
                solution='Make sure that the microphone is connected to the PC via USB',
499
            )
NEW
500
            return False
×
501

502

503
class ValidatorGit(Validator):
2✔
504
    _name = 'Git'
2✔
505

506
    def _run(self):
2✔
NEW
507
        if not IS_GIT:
×
NEW
508
            yield Result(Status.SKIP, 'Your copy of IBLRIG is not managed through Git')
×
NEW
509
            return False
×
510

NEW
511
        return_status = True
×
NEW
512
        main_branch = 'iblrigv8'
×
NEW
513
        this_branch = get_branch()
×
NEW
514
        if this_branch != main_branch:
×
NEW
515
            yield Result(
×
516
                Status.WARN,
517
                f"Working tree of IBLRIG is on Git branch '{this_branch}'",
518
                solution=f"Issue 'git checkout {main_branch}' to switch to '{main_branch}' branch",
519
            )
NEW
520
            return_status = False
×
521
        else:
NEW
522
            yield Result(Status.PASS, f"Working tree of IBLRIG is on Git branch '{main_branch}'")
×
523

NEW
524
        if is_dirty():
×
NEW
525
            yield Result(
×
526
                Status.WARN,
527
                "Working tree of IBLRIG contains local changes - don't expect things to work as intended!",
528
                solution="To list files that have been changed locally, issue 'git diff --name-only'. "
529
                "Issue 'git reset --hard' to reset the repository to its default state",
530
            )
NEW
531
            return_status = False
×
532
        else:
NEW
533
            yield Result(Status.PASS, 'Working tree of IBLRIG does not contain local changes')
×
534

NEW
535
        return return_status
×
536

537

538
class _SoundCheckTask(BpodMixin, SoundMixin):
2✔
539
    protocol_name = 'hardware_check_harp'
2✔
540

541
    def __init__(self, *args, **kwargs):
2✔
NEW
542
        param_file = BASE_PATH.joinpath('iblrig', 'base_choice_world_params.yaml')
×
NEW
543
        super().__init__(*args, task_parameter_file=param_file, **kwargs)
×
544

545
    def start_hardware(self):
2✔
NEW
546
        self.start_mixin_bpod()
×
NEW
547
        self.start_mixin_sound()
×
548

549
    def get_state_machine(self):
2✔
NEW
550
        sma = StateMachine(self.bpod)
×
NEW
551
        sma.add_state('tone', 0.5, {'Tup': 'exit'}, [self.bpod.actions.play_tone])
×
NEW
552
        return sma
×
553

554
    def _run(self):
2✔
NEW
555
        pass
×
556

557
    def create_session(self):
2✔
NEW
558
        pass
×
559

560

561
class ValidatorSound(ValidatorSerial):
2✔
562
    _name = 'Sound'
2✔
563
    _module_name: str | None = None
2✔
564

565
    def __init__(self, *args, **kwargs):
2✔
566
        output_type = kwargs['hardware_settings'].device_sound.OUTPUT
2✔
567
        match output_type:
2✔
568
            case 'harp':
2✔
NEW
569
                self._name = 'HARP Sound Card'
×
NEW
570
                self._module_name = 'SoundCard'
×
NEW
571
                self.port_properties = {'vid': 0x0403, 'pid': 0x6001}
×
572
            case 'hifi':
2✔
NEW
573
                self._name = 'Bpod HiFi Module'
×
NEW
574
                self._module_name = 'HiFi'
×
NEW
575
                self.serial_queries = {(b'\xf3', 1): b'\xf4'}
×
NEW
576
                self.port_properties = {'vid': 0x16C0, 'pid': 0x0483}
×
577
            case 'xonar':
2✔
NEW
578
                self._name = 'Xonar Sound Card'
×
579
        if output_type in ['harp', 'hifi']:
2✔
NEW
580
            super().__init__(*args, **kwargs)  # call ValidatorSerial.__init__()
×
581
        else:
582
            super(ValidatorSerial, self).__init__(*args, **kwargs)  # call Validator.__init__()
2✔
583

584
    @property
2✔
585
    def port(self) -> str | None:
2✔
NEW
586
        match self.hardware_settings.device_sound.OUTPUT:
×
NEW
587
            case 'harp':
×
NEW
588
                return (
×
589
                    com_port
590
                    if (com_port := self.hardware_settings.device_sound.COM_SOUND) is not None
591
                    else next(filter_ports(**self.port_properties), None)
592
                )
NEW
593
            case 'hifi':
×
NEW
594
                return self.hardware_settings.device_sound.COM_SOUND
×
NEW
595
            case _:
×
NEW
596
                return None
×
597

598
    def _run(self):
2✔
NEW
599
        if (success := self.hardware_settings.device_sound.OUTPUT) == 'sysdefault':
×
NEW
600
            yield Result(
×
601
                Status.FAIL,
602
                "Sound output device 'sysdefault' is intended for testing purposes only",
603
                solution="Set device_sound.OUTPUT to 'hifi', 'harp' or 'xonar'",
604
            )
NEW
605
            return False
×
606

607
        # check serial device
NEW
608
        if self.hardware_settings.device_sound.OUTPUT in ['harp', 'hifi']:
×
NEW
609
            success = yield from super()._run()
×
NEW
610
            if not success:
×
NEW
611
                return False
×
612

613
        # device-specific validations
NEW
614
        match self.hardware_settings.device_sound.OUTPUT:
×
NEW
615
            case 'harp':
×
NEW
616
                if (dev := usb.core.find(manufacturer='Champalimaud Foundation', product='Harp Sound Card')) is None:
×
NEW
617
                    yield Result(
×
618
                        Status.FAIL,
619
                        'Cannot find USB sound device',
620
                        solution="Connect both of the sound card's USB ports and make sure that the HARP drivers are "
621
                        'installed',
622
                    )
NEW
623
                    return False
×
624
                else:
NEW
625
                    yield Result(Status.PASS, 'Found USB sound device')
×
NEW
626
                    yield Result(Status.INFO, f'USB ID: {dev.idVendor:04X}:{dev.idProduct:04X}')
×
627

628
        # yield module's connection status
NEW
629
        if self._module_name is not None:
×
NEW
630
            module = yield from self._get_module(self._module_name)
×
NEW
631
            if module is None:
×
NEW
632
                return False
×
633

634
        # run state machine
NEW
635
        if self.interactive:
×
NEW
636
            task = _SoundCheckTask(subject='toto')
×
NEW
637
            task.start_hardware()
×
NEW
638
            sma = task.get_state_machine()
×
NEW
639
            task.bpod.send_state_machine(sma)
×
NEW
640
            yield Result(Status.INFO, 'Playing audible sound - can you hear it?')
×
NEW
641
            task.bpod.run_state_machine(sma)
×
NEW
642
            bpod_data = task.bpod.session.current_trial.export()
×
NEW
643
            if (n_events := len(bpod_data['Events timestamps'].get('BNC2High', []))) == 0:
×
NEW
644
                yield Result(
×
645
                    Status.FAIL,
646
                    "No event detected on Bpod's BNC In 2",
647
                    solution="Make sure to connect the sound-card to Bpod's TTL Input 2",
648
                )
NEW
649
            elif n_events == 1:
×
NEW
650
                yield Result(Status.PASS, "Detected Event on Bpod's TTL Input 2")
×
651
            else:
NEW
652
                yield Result(
×
653
                    Status.FAIL,
654
                    "Multiple events detected on Bpod's BNC Input 2",
655
                    solution="Make sure to connect the sound-card to Bpod's TTL Input 2",
656
                )
657

658

659
def get_all_validators() -> list[type[Validator]]:
2✔
660
    return [cast(type[Validator], x) for x in get_inheritors(Validator) if not isabstract(x)]
2✔
661

662

663
def run_all_validators(
2✔
664
    iblrig_settings: RigSettings | None = None, hardware_settings: HardwareSettings | None = None, interactive: bool = False
665
) -> Generator[Result, None, None]:
NEW
666
    validators = get_all_validators()
×
NEW
667
    for validator in validators:
×
NEW
668
        yield from validator(iblrig_settings=iblrig_settings, hardware_settings=hardware_settings, interactive=interactive).run()
×
669

670

671
def run_all_validators_cli():
2✔
NEW
672
    validators = get_all_validators()
×
NEW
673
    fail = 0
×
NEW
674
    warn = 0
×
NEW
675
    for validator in validators:
×
NEW
676
        v = validator()
×
NEW
677
        print(f'{ANSI.BOLD + ANSI.UNDERLINE + v.name + ANSI.END}')
×
NEW
678
        for result in v.run():
×
NEW
679
            if result.status == Status.FAIL:
×
NEW
680
                color = ANSI.RED + ANSI.BOLD
×
NEW
681
                fail += 1
×
NEW
682
            elif result.status == Status.WARN:
×
NEW
683
                color = ANSI.YELLOW + ANSI.BOLD
×
NEW
684
                warn += 1
×
685
            else:
NEW
686
                color = ANSI.END
×
NEW
687
            print(f'{color}- {result.message}{ANSI.END}')
×
NEW
688
            if result.solution is not None and len(result.solution) > 0:
×
NEW
689
                print(f'{color}  Suggestion: {result.solution}{ANSI.END}')
×
NEW
690
        print('')
×
NEW
691
    if fail > 0:
×
NEW
692
        print(ANSI.RED + ANSI.BOLD + f'{fail} validation{"s" if fail > 1 else ""} failed.')
×
NEW
693
    if warn > 0:
×
NEW
694
        print(ANSI.YELLOW + ANSI.BOLD + f'Validations passed with {warn} warning{"s" if warn > 1 else ""}.')
×
NEW
695
    if warn == 0 and fail == 0:
×
NEW
696
        print(ANSI.GREEN + ANSI.BOLD + 'All validations were passed - no issues found.')
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc