• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 10992769815

23 Sep 2024 10:50AM UTC coverage: 47.799% (+1.0%) from 46.79%
10992769815

Pull #716

github

60cd00
web-flow
Merge 73b6a53cb into a946a6ff9
Pull Request #716: 8.24.1

22 of 49 new or added lines in 9 files covered. (44.9%)

1015 existing lines in 22 files now uncovered.

4191 of 8768 relevant lines covered (47.8%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.37
/iblrig/gui/valve.py
1
import logging
2✔
2
from collections import OrderedDict
2✔
3
from datetime import date
2✔
4

5
import numpy as np
2✔
6
import pyqtgraph as pg
2✔
7
from PyQt5 import QtCore, QtGui, QtWidgets
2✔
8
from PyQt5.QtCore import QThreadPool
2✔
9
from pyqtgraph import PlotWidget
2✔
10
from serial import SerialException
2✔
11
from typing_extensions import override
2✔
12

13
from iblrig.gui.tools import Worker
2✔
14
from iblrig.gui.ui_valve import Ui_valve
2✔
15
from iblrig.hardware import Bpod
2✔
16
from iblrig.path_helper import save_pydantic_yaml
2✔
17
from iblrig.pydantic_definitions import HardwareSettings
2✔
18
from iblrig.scale import Scale
2✔
19
from iblrig.valve import Valve, ValveValues
2✔
20
from pybpodapi.exceptions.bpod_error import BpodErrorException
2✔
21

22
log = logging.getLogger(__name__)
2✔
23

24

25
class CalibrationPlot:
2✔
26
    def __init__(self, parent: PlotWidget, name: str, color: str, values: ValveValues | None = None):
2✔
27
        self._values = values if values is not None else ValveValues([], [])
×
28
        self._curve = pg.PlotCurveItem(name=name)
×
29
        self._curve.setPen(color, width=3)
×
30
        self._points = pg.ScatterPlotItem()
×
31
        self._points.setPen(color)
×
32
        self._points.setBrush(color)
×
33
        self._parent = parent
×
34
        parent.addItem(self._curve)
×
35
        parent.addItem(self._points)
×
36
        self.update()
×
37

38
    @property
2✔
39
    def values(self) -> ValveValues:
2✔
40
        return self._values
×
41

42
    @values.setter
2✔
43
    def values(self, values: ValveValues):
2✔
44
        self._values = values
×
45
        self.update()
×
46

47
    def update(self):
2✔
48
        self._points.setData(x=self.values.open_times_ms, y=self.values.volumes_ul)
×
49
        if len(self.values.open_times_ms) < 2:
×
50
            self._curve.setData(x=[], y=[])
×
51
        else:
NEW
52
            time_range = list(np.linspace(0, self.values.open_times_ms[-1], 100))
×
53
            self._curve.setData(x=time_range, y=self.values.ms2ul(time_range))
×
54

55
    def clear(self):
2✔
56
        self.values.clear_data()
×
57
        self.update()
×
58

59

60
class ValveCalibrationDialog(QtWidgets.QDialog, Ui_valve):
2✔
61
    scale: Scale | None = None
2✔
62
    scale_initialized = QtCore.pyqtSignal(bool)
2✔
63
    scale_text_changed = QtCore.pyqtSignal(str)
2✔
64
    scale_stable_changed = QtCore.pyqtSignal(bool)
2✔
65
    drop_cleared = QtCore.pyqtSignal(int)
2✔
66
    tared = QtCore.pyqtSignal(bool)
2✔
67
    calibration_finished = QtCore.pyqtSignal()
2✔
68
    start_next_calibration = QtCore.pyqtSignal()
2✔
69
    _grams = float('nan')
2✔
70
    _stable = False
2✔
71
    _next_calibration_step = 1
2✔
72
    _next_calibration_time = float('nan')
2✔
73
    _scale_update_ms = 100
2✔
74
    _clear_drop_counter = 0
2✔
75

76
    def __init__(self, *args, **kwargs) -> None:
2✔
77
        super().__init__(*args, **kwargs)
×
78
        self.setupUi(self)
×
79

80
        # state machine for GUI logic
81
        self.machine = QtCore.QStateMachine()
×
82
        self.states: OrderedDict[str, QtCore.QStateMachine] = OrderedDict({})
×
83

84
        # timers
85
        self.scale_timer = QtCore.QTimer()
×
86
        self.clear_timer = QtCore.QTimer()
×
87
        self.clear_timer.setTimerType(QtCore.Qt.TimerType.PreciseTimer)
×
88

89
        # hardware
90
        self.hw_settings: HardwareSettings = self.parent().model.hardware_settings
×
91
        self.bpod = Bpod(self.hw_settings.device_bpod.COM_BPOD, skip_initialization=True, disable_behavior_ports=[0, 1, 2, 3])
×
92
        self.valve = Valve(self.hw_settings.device_valve)
×
93

94
        # UI related ...
95
        self.font_database = QtGui.QFontDatabase
×
96
        self.font_database.addApplicationFont(':/fonts/7-Segment')
×
97
        self.lineEditGrams.setFont(QtGui.QFont('7-Segment', 30))
×
98
        self.action_grams = self.lineEditGrams.addAction(
×
99
            QtGui.QIcon(':/images/grams'), QtWidgets.QLineEdit.ActionPosition.TrailingPosition
100
        )
101
        self.action_stable = self.lineEditGrams.addAction(
×
102
            QtGui.QIcon(':/images/stable'), QtWidgets.QLineEdit.ActionPosition.LeadingPosition
103
        )
104
        self.action_grams.setVisible(False)
×
105
        self.action_stable.setVisible(False)
×
106
        self.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
×
107
        self.setWindowFlags(self.windowFlags() & ~QtCore.Qt.WindowContextHelpButtonHint)
×
108
        self.setModal(QtCore.Qt.WindowModality.ApplicationModal)
×
109

110
        # set up plot widget
111
        self.uiPlot.addLegend()
×
112
        self.old_calibration = CalibrationPlot(
×
113
            self.uiPlot, f'previous calibration ({self.valve.calibration_date})', 'gray', self.valve.values
114
        )
115
        self.new_calibration = CalibrationPlot(self.uiPlot, 'new calibration', 'black')
×
116
        self.uiPlot.hideButtons()
×
117
        self.uiPlot.setMenuEnabled(False)
×
118
        self.uiPlot.setMouseEnabled(x=False, y=False)
×
119
        self.uiPlot.setBackground(None)
×
120
        self.uiPlot.setLabel('bottom', 'Opening Time [ms]')
×
121
        self.uiPlot.setLabel('left', 'Volume [μL]')
×
122
        self.uiPlot.getViewBox().setLimits(xMin=0, yMin=0)
×
123
        self.uiPlot.getViewBox().enableAutoRange(True)
×
124

125
        # signals & slots
126
        self.scale_text_changed.connect(self.display_scale_text)
×
127
        self.scale_stable_changed.connect(self.display_scale_stable)
×
128
        self.pushButtonPulseValve.clicked.connect(self.pulse_valve)
×
129
        self.pushButtonToggleValve.clicked.connect(self.toggle_valve)
×
130
        self.pushButtonTareScale.clicked.connect(self.tare)
×
131
        self.pushButtonSave.setEnabled(False)
×
132
        self.pushButtonCancel.clicked.connect(self.close)
×
133
        self.pushButtonRestart.setVisible(False)
×
134
        self.scale_initialized.connect(self.define_and_start_state_machine)
×
135

136
        # initialize scale
137
        worker = Worker(self.initialize_scale, port=self.hw_settings.device_scale.COM_SCALE)
×
138
        worker.signals.result.connect(self._on_initialize_scale_result)
×
139
        QThreadPool.globalInstance().tryStart(worker)
×
140

141
        self.show()
×
142

143
    @QtCore.pyqtSlot(bool)
2✔
144
    def define_and_start_state_machine(self, use_scale: bool = False) -> None:
2✔
145
        for state_name in ['start', 'beaker', 'beaker2', 'flow', 'clear', 'tare', 'calibrate', 'finished', 'save']:
×
146
            self.states[state_name] = QtCore.QState(self.machine)
×
147
        self.machine.setInitialState(self.states['start'])
×
148

149
        # state 'start': welcome the user and explain what's going on --------------------------------------------------
150
        self.states['start'].assignProperty(self.labelGuideHead, 'text', 'Welcome')
×
151
        self.states['start'].assignProperty(
×
152
            self.labelGuideText,
153
            'text',
154
            'This is a step-by-step guide for calibrating the valve of your rig. You can abort the process at any time by '
155
            'pressing Cancel or closing this window.',
156
        )
157
        self.states['start'].assignProperty(self.commandLinkNext, 'enabled', True)
×
158
        self.states['start'].addTransition(self.commandLinkNext.clicked, self.states['beaker'])
×
159

160
        # state 'beaker': ask user to position beaker on scale ---------------------------------------------------------
161
        self.states['beaker'].assignProperty(self.labelGuideHead, 'text', 'Preparation')
×
162
        self.states['beaker'].assignProperty(
×
163
            self.labelGuideText,
164
            'text',
165
            'Fill the water reservoir to the level used during experiments.\n\n'
166
            'Place a small beaker on the scale and position the lick spout directly above.\n\n'
167
            'The opening of the spout should be placed at a vertical position identical to the one used during '
168
            'experiments.',
169
        )
170
        self.states['beaker'].entered.connect(self.clear_calibration)
×
171
        self.states['beaker'].assignProperty(self.pushButtonRestart, 'visible', False)
×
172
        self.states['beaker'].assignProperty(self.commandLinkNext, 'visible', True)
×
173
        self.states['beaker'].assignProperty(self.commandLinkNext, 'enabled', True)
×
174
        self.states['beaker'].assignProperty(self.pushButtonSave, 'enabled', False)
×
175
        self.states['beaker'].assignProperty(self.pushButtonTareScale, 'enabled', use_scale)
×
176
        self.states['beaker'].assignProperty(self.pushButtonToggleValve, 'enabled', True)
×
177
        self.states['beaker'].assignProperty(self.pushButtonPulseValve, 'enabled', True)
×
178
        self.states['beaker'].addTransition(self.commandLinkNext.clicked, self.states['beaker2'])
×
179

180
        # state 'beaker': ask user to position beaker on scale ---------------------------------------------------------
181
        self.states['beaker2'].assignProperty(self.labelGuideHead, 'text', 'Preparation')
×
182
        self.states['beaker2'].assignProperty(
×
183
            self.labelGuideText,
184
            'text',
185
            'Make sure that neither lick spout nor tubing touch the beaker or the scale and that water drops can '
186
            'freely fall into the beaker.',
187
        )
188
        self.states['beaker2'].addTransition(self.commandLinkNext.clicked, self.states['flow'])
×
189

190
        # state 'flow': prepare flow of water --------------------------------------------------------------------------
191
        self.states['flow'].assignProperty(self.labelGuideHead, 'text', 'Preparation')
×
192
        self.states['flow'].assignProperty(
×
193
            self.labelGuideText,
194
            'text',
195
            'Use the valve controls above to advance the flow of the water until there are no visible pockets of air within the '
196
            'tubing and first drops start falling into the beaker.',
197
        )
198
        self.states['flow'].addTransition(self.commandLinkNext.clicked, self.states['clear'])
×
199

200
        # state 'clear': try to clear one drop of water to set a defined start point for calibration -------------------
201
        self.states['clear'].entered.connect(self.clear_drop)
×
202
        self.states['clear'].assignProperty(self.pushButtonTareScale, 'enabled', False)
×
203
        self.states['clear'].assignProperty(self.pushButtonToggleValve, 'enabled', False)
×
204
        if use_scale:
×
205
            self.states['clear'].assignProperty(self.pushButtonPulseValve, 'enabled', False)
×
206
            self.states['clear'].assignProperty(self.commandLinkNext, 'enabled', False)
×
207
            self.states['clear'].addTransition(self.drop_cleared, self.states['tare'])
×
208
        else:
209
            self.states['clear'].assignProperty(self.pushButtonPulseValve, 'enabled', True)
×
210
            self.states['clear'].assignProperty(self.commandLinkNext, 'enabled', True)
×
211
            self.states['clear'].addTransition(self.commandLinkNext.clicked, self.states['tare'])
×
212

213
        # state 'tare': tare the scale ---------------------------------------------------------------------------------
214
        self.states['tare'].assignProperty(self.pushButtonPulseValve, 'enabled', False)
×
215
        if use_scale:
×
216
            self.states['tare'].entered.connect(self.tare)
×
217
            self.states['tare'].addTransition(self.tared, self.states['calibrate'])
×
218
        else:
219
            self.states['tare'].assignProperty(self.labelGuideText, 'text', 'Tare the scale.')
×
220
            self.states['tare'].addTransition(self.commandLinkNext.clicked, self.states['calibrate'])
×
221

222
        # state 'calibrate': perform the actual measurement ------------------------------------------------------------
223
        self.states['calibrate'].entered.connect(self.calibrate)
×
224
        self.states['calibrate'].assignProperty(self.commandLinkNext, 'enabled', False)
×
225
        self.states['calibrate'].addTransition(self.start_next_calibration, self.states['clear'])
×
226
        self.states['calibrate'].addTransition(self.calibration_finished, self.states['finished'])
×
227

228
        # state 'finished': ask user to save or discard the calibration ------------------------------------------------
229
        self.states['finished'].assignProperty(self.labelGuideHead, 'text', 'Calibration is finished')
×
230
        self.states['finished'].assignProperty(
×
231
            self.labelGuideText,
232
            'text',
233
            'Click Save to store the calibration. Close this window or click Cancel to discard the calibration.',
234
        )
235
        self.states['finished'].assignProperty(self.commandLinkNext, 'visible', False)
×
236
        self.states['finished'].assignProperty(self.pushButtonSave, 'enabled', True)
×
237
        self.states['finished'].assignProperty(self.pushButtonRestart, 'visible', True)
×
238
        self.states['finished'].addTransition(self.pushButtonRestart.clicked, self.states['beaker'])
×
239
        self.states['finished'].addTransition(self.pushButtonSave.clicked, self.states['save'])
×
240

241
        # state 'save': save calibration and quit ----------------------------------------------------------------------
242
        self.states['save'].entered.connect(self.save)
×
243
        self.states['save'].assignProperty(self, 'enabled', False)
×
244

245
        self.machine.start()
×
246

247
    def clear_calibration(self):
2✔
248
        self.new_calibration.clear()
×
249
        self._next_calibration_time = self.get_next_calibration_time()
×
250

251
    def get_next_calibration_time(self) -> float | None:
2✔
252
        remaining_calibration_times = [
×
253
            t for t in self.valve.new_calibration_open_times if t not in self.new_calibration.values.open_times_ms
254
        ]
255
        if len(remaining_calibration_times) > 0:
×
256
            return max(remaining_calibration_times)
×
257
        else:
258
            return None
×
259

260
    def initialize_scale(self, port: str) -> bool:
2✔
261
        if port is None:
×
262
            self.groupBoxScale.setVisible(False)
×
263
            return False
×
264
        try:
×
265
            self.lineEditGrams.setAlignment(QtCore.Qt.AlignCenter)
×
266
            self.lineEditGrams.setText('Starting')
×
267
            self.scale = Scale(port)
×
268
            return True
×
269
        except (AssertionError, SerialException):
×
270
            log.error(f'Error initializing OHAUS scale on {port}.')
×
271
            return False
×
272

273
    def _on_initialize_scale_result(self, success: bool):
2✔
274
        if success:
×
275
            self.lineEditGrams.setEnabled(True)
×
276
            self.pushButtonTareScale.setEnabled(True)
×
277
            self.lineEditGrams.setAlignment(QtCore.Qt.AlignRight)
×
278
            self.lineEditGrams.setText('')
×
279
            self.scale_timer.timeout.connect(self.get_scale_reading)
×
280
            self.action_grams.setVisible(True)
×
281
            self.get_scale_reading()
×
282
            self.scale_timer.start(self._scale_update_ms)
×
283
        else:
284
            self.lineEditGrams.setAlignment(QtCore.Qt.AlignCenter)
×
285
            self.lineEditGrams.setText('Error')
×
286
        self.scale_initialized.emit(success)
×
287

288
    def get_scale_reading(self):
2✔
289
        grams, stable = self.scale.get_grams()
×
290
        if grams != self._grams:
×
291
            self.scale_text_changed.emit(f'{grams:0.2f}')
×
292
        if stable != self._stable:
×
293
            self.scale_stable_changed.emit(stable)
×
294
        self._grams = grams
×
295
        self._stable = stable
×
296

297
    @QtCore.pyqtSlot(str)
2✔
298
    def display_scale_text(self, value: str):
2✔
299
        self.lineEditGrams.setText(value)
×
300

301
    @QtCore.pyqtSlot(bool)
2✔
302
    def display_scale_stable(self, value: bool):
2✔
303
        self.action_stable.setVisible(value)
×
304

305
    def toggle_valve(self):
2✔
306
        state = self.pushButtonToggleValve.isChecked()
×
307
        self.pushButtonToggleValve.setStyleSheet('QPushButton {background-color: rgb(128, 128, 255);}' if state else '')
×
308
        try:
×
309
            self.bpod.open_valve(open=state)
×
310
        except (OSError, BpodErrorException):
×
311
            self.pushButtonToggleValve.setChecked(False)
×
312
            self.pushButtonToggleValve.setStyleSheet('')
×
313

314
    def pulse_valve(self):
2✔
315
        self.bpod.pulse_valve(0.05)
×
316

317
    def clear_drop(self):
2✔
318
        self.labelGuideHead.setText('Calibration')
×
319
        if self.scale is None:
×
320
            self.labelGuideText.setText(
×
321
                "Use the 'Pulse Valve' button above to clear one drop of water in order to obtain a defined starting point for "
322
                'calibration.'
323
            )
324
        else:
325
            self.labelGuideText.setText(
×
326
                'Trying to automatically clear one drop of water to obtain a defined starting point for calibration.'
327
            )
328
            open_time_s = 0.05
×
329
            initial_grams = self.scale.get_stable_grams()
×
330
            self._clear_drop_counter = 0
×
331
            self.clear_timer.timeout.connect(lambda: self.clear_crop_callback(initial_grams, open_time_s))
×
332
            self.clear_timer.start(500 + int(open_time_s * 1000))
×
333

334
    def clear_crop_callback(self, initial_grams: float, duration_s: float = 0.05):
2✔
335
        if self.scale.get_grams()[0] > initial_grams + 0.02:
×
336
            self.clear_timer.stop()
×
337
            self.clear_timer.disconnect()
×
338
            self.drop_cleared.emit(self._clear_drop_counter)
×
339
            return
×
340
        self._clear_drop_counter += 1
×
341
        self.bpod.pulse_valve(duration_s)
×
342

343
    def tare(self):
2✔
344
        self.scale_timer.stop()
×
345
        self.scale_text_changed.emit('------')
×
346
        self._grams = float('nan')
×
347
        worker = Worker(self.scale.tare)
×
348
        worker.signals.result.connect(self._on_tare_finished)
×
349
        QThreadPool.globalInstance().tryStart(worker)
×
350

351
    @QtCore.pyqtSlot(object)
2✔
352
    def _on_tare_finished(self, success: bool):
2✔
353
        self.scale_timer.start(self._scale_update_ms)
×
354
        self.tared.emit(success)
×
355

356
    @QtCore.pyqtSlot()
2✔
357
    def calibrate(self):
2✔
358
        n_samples = int(np.ceil(50 * max(self.valve.new_calibration_open_times) / self._next_calibration_time))
×
359
        self.labelGuideText.setText(
×
360
            f'Getting {n_samples} samples for a valve opening time of {self._next_calibration_time} ms ...'
361
        )
362
        worker = Worker(self.bpod.pulse_valve_repeatedly, n_samples, self._next_calibration_time / 1e3, 0.2)
×
363
        worker.signals.result.connect(self._on_repeated_pulse_finished)
×
364
        QThreadPool.globalInstance().tryStart(worker)
×
365

366
    @QtCore.pyqtSlot(object)
2✔
367
    def _on_repeated_pulse_finished(self, n_pulses: int):
2✔
368
        if self.scale is None:
×
369
            ok = False
×
370
            scale_reading = 0
×
371
            while not ok or scale_reading <= 0:
×
372
                scale_reading, ok = QtWidgets.QInputDialog().getDouble(
×
373
                    self,
374
                    'Enter Scale Reading',
375
                    'Enter measured weight in grams:',
376
                    min=0,
377
                    max=float('inf'),
378
                    decimals=2,
379
                    flags=(QtWidgets.QInputDialog().windowFlags() & ~QtCore.Qt.WindowType.WindowContextHelpButtonHint),
380
                )
381
            grams_per_pulse = scale_reading / n_pulses
×
382
        else:
383
            scale_reading = self.scale.get_stable_grams()
×
384
            grams_per_pulse = scale_reading / n_pulses
×
385
        self.new_calibration.values.add_samples([self._next_calibration_time], [grams_per_pulse])
×
386
        self.new_calibration.update()
×
387
        self._next_calibration_time = self.get_next_calibration_time()
×
388
        if self._next_calibration_time is None:
×
389
            self.calibration_finished.emit()
×
390
        else:
391
            self.start_next_calibration.emit()
×
392

393
    def save(self) -> None:
2✔
394
        valve_settings = self.hw_settings.device_valve
×
395
        valve_settings.WATER_CALIBRATION_OPEN_TIMES = [float(x) for x in self.new_calibration.values.open_times_ms]
×
396
        valve_settings.WATER_CALIBRATION_WEIGHT_PERDROP = [float(x) for x in self.new_calibration.values.volumes_ul]
×
397
        valve_settings.WATER_CALIBRATION_DATE = date.today()
×
398
        self.parent().model.hardware_settings.device_valve = valve_settings
×
399
        save_pydantic_yaml(self.parent().model.hardware_settings)
×
400
        self.labelGuideHead.setText('Settings saved.')
×
401
        self.labelGuideText.setText('')
×
402
        QtCore.QTimer.singleShot(1000, self.close)
×
403

404
    @override
2✔
405
    def closeEvent(self, event):
2✔
406
        self.clear_timer.stop()
×
407
        self.scale_timer.stop()
×
408
        if self.machine.started:
×
409
            self.machine.stop()
×
410
        if self.bpod.is_connected:
×
411
            self.bpod.stop_trial()
×
412
        self.deleteLater()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc