• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 9031936551

10 May 2024 12:05PM UTC coverage: 48.538% (+1.7%) from 46.79%
9031936551

Pull #643

github

53c3e3
web-flow
Merge 3c8214f78 into ec2d8e4fe
Pull Request #643: 8.19.0

377 of 1073 new or added lines in 38 files covered. (35.14%)

977 existing lines in 19 files now uncovered.

3253 of 6702 relevant lines covered (48.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.96
/iblrig/gui/valve.py
1
import functools
2✔
2
import logging
2✔
3
from collections import OrderedDict
2✔
4
from datetime import date
2✔
5

6
import numpy as np
2✔
7
import pyqtgraph as pg
2✔
8
from PyQt5 import QtCore, QtGui, QtWidgets
2✔
9
from PyQt5.QtCore import QThreadPool
2✔
10
from pyqtgraph import PlotWidget
2✔
11
from serial import SerialException
2✔
12
from typing_extensions import override
2✔
13

14
from iblrig.gui.tools import Worker
2✔
15
from iblrig.gui.ui_valve import Ui_valve
2✔
16
from iblrig.hardware import Bpod
2✔
17
from iblrig.path_helper import save_pydantic_yaml
2✔
18
from iblrig.pydantic_definitions import HardwareSettings
2✔
19
from iblrig.scale import Scale
2✔
20
from iblrig.valve import Valve, ValveValues
2✔
21
from pybpodapi.exceptions.bpod_error import BpodErrorException
2✔
22

23
log = logging.getLogger(__name__)
2✔
24

25

26
class CalibrationPlot:
2✔
27
    def __init__(self, parent: PlotWidget, name: str, color: str, values: ValveValues | None = None):
2✔
28
        self._values = values if values is not None else ValveValues([], [])
×
29
        self._curve = pg.PlotCurveItem(name=name)
×
30
        self._curve.setPen(color, width=3)
×
31
        self._points = pg.ScatterPlotItem()
×
32
        self._points.setPen(color)
×
33
        self._points.setBrush(color)
×
34
        self._parent = parent
×
35
        parent.addItem(self._curve)
×
36
        parent.addItem(self._points)
×
37
        self.update()
×
38

39
    @property
2✔
40
    def values(self) -> ValveValues:
2✔
41
        return self._values
×
42

43
    @values.setter
2✔
44
    def values(self, values: ValveValues):
2✔
45
        self._values = values
×
46
        self.update()
×
47

48
    def update(self):
2✔
49
        self._points.setData(x=self.values.open_times_ms, y=self.values.volumes_ul)
×
50
        if len(self.values.open_times_ms) < 2:
×
51
            self._curve.setData(x=[], y=[])
×
52
        else:
53
            time_range = list(np.linspace(self.values.open_times_ms[0], self.values.open_times_ms[-1], 100))
×
54
            self._curve.setData(x=time_range, y=self.values.ms2ul(time_range))
×
55

56
    def clear(self):
2✔
57
        self.values.clear_data()
×
58
        self.update()
×
59

60

61
class ValveCalibrationDialog(QtWidgets.QDialog, Ui_valve):
2✔
62
    scale: Scale | None = None
2✔
63
    scale_initialized = QtCore.pyqtSignal(bool)
2✔
64
    scale_text_changed = QtCore.pyqtSignal(str)
2✔
65
    scale_stable_changed = QtCore.pyqtSignal(bool)
2✔
66
    drop_cleared = QtCore.pyqtSignal(int)
2✔
67
    tared = QtCore.pyqtSignal(bool)
2✔
68
    calibration_finished = QtCore.pyqtSignal()
2✔
69
    start_next_calibration = QtCore.pyqtSignal()
2✔
70
    _grams = float('nan')
2✔
71
    _stable = False
2✔
72
    _next_calibration_step = 1
2✔
73
    _next_calibration_time = float('nan')
2✔
74
    _scale_update_ms = 100
2✔
75
    _clear_drop_counter = 0
2✔
76

77
    def __init__(self, *args, **kwargs) -> None:
2✔
78
        super().__init__(*args, **kwargs)
×
79
        self.setupUi(self)
×
80

81
        # state machine for GUI logic
82
        self.machine = QtCore.QStateMachine()
×
83
        self.states: OrderedDict[str, QtCore.QStateMachine] = OrderedDict({})
×
84

85
        # timers
86
        self.scale_timer = QtCore.QTimer()
×
87
        self.clear_timer = QtCore.QTimer()
×
88
        self.clear_timer.setTimerType(QtCore.Qt.TimerType.PreciseTimer)
×
89

90
        # hardware
91
        self.hw_settings: HardwareSettings = self.parent().model.hardware_settings
×
92
        self.bpod = Bpod(self.hw_settings.device_bpod.COM_BPOD, skip_initialization=True, disable_behavior_ports=[0, 1, 2, 3])
×
93
        self.valve = Valve(self.hw_settings.device_valve)
×
94

95
        # scale
96
        worker = Worker(self.initialize_scale, port=self.hw_settings.device_scale.COM_SCALE)
×
97
        worker.signals.result.connect(self._on_initialize_scale_result)
×
98
        QThreadPool.globalInstance().tryStart(worker)
×
99

100
        # UI related ...
101
        self.font_database = QtGui.QFontDatabase
×
102
        self.font_database.addApplicationFont(':/fonts/7-Segment')
×
103
        self.lineEditGrams.setFont(QtGui.QFont('7-Segment', 30))
×
104
        self.action_grams = self.lineEditGrams.addAction(
×
105
            QtGui.QIcon(':/images/grams'), QtWidgets.QLineEdit.ActionPosition.TrailingPosition
106
        )
107
        self.action_stable = self.lineEditGrams.addAction(
×
108
            QtGui.QIcon(':/images/stable'), QtWidgets.QLineEdit.ActionPosition.LeadingPosition
109
        )
110
        self.action_grams.setVisible(False)
×
111
        self.action_stable.setVisible(False)
×
112
        self.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
×
113
        self.setWindowFlags(self.windowFlags() & ~QtCore.Qt.WindowContextHelpButtonHint)
×
114
        self.setModal(QtCore.Qt.WindowModality.ApplicationModal)
×
115

116
        # set up plot widget
117
        self.uiPlot.addLegend()
×
118
        self.old_calibration = CalibrationPlot(
×
119
            self.uiPlot, f'previous calibration ({self.valve.calibration_date})', 'gray', self.valve.values
120
        )
121
        self.new_calibration = CalibrationPlot(self.uiPlot, 'new calibration', 'black')
×
122
        self.uiPlot.hideButtons()
×
123
        self.uiPlot.setMenuEnabled(False)
×
124
        self.uiPlot.setMouseEnabled(x=False, y=False)
×
125
        self.uiPlot.setBackground(None)
×
126
        self.uiPlot.setLabel('bottom', 'Opening Time [ms]')
×
127
        self.uiPlot.setLabel('left', 'Volume [μL]')
×
128
        self.uiPlot.getViewBox().setLimits(xMin=0, yMin=0)
×
129
        self.uiPlot.getViewBox().enableAutoRange(True)
×
130

131
        # signals & slots
132
        self.scale_text_changed.connect(self.display_scale_text)
×
133
        self.scale_stable_changed.connect(self.display_scale_stable)
×
134
        self.pushButtonPulseValve.clicked.connect(self.pulse_valve)
×
135
        self.pushButtonToggleValve.clicked.connect(self.toggle_valve)
×
136
        self.pushButtonTareScale.clicked.connect(self.tare)
×
137
        self.pushButtonSave.setEnabled(False)
×
138
        self.pushButtonCancel.clicked.connect(self.close)
×
139
        self.pushButtonRestart.setVisible(False)
×
NEW
140
        self.scale_initialized.connect(self.define_and_start_state_machine)
×
141

142
        self.show()
×
143

144
    @QtCore.pyqtSlot(bool)
2✔
145
    def define_and_start_state_machine(self, use_scale: bool = False) -> None:
2✔
146
        for state_name in ['start', 'beaker', 'flow', 'clear', 'tare', 'calibrate', 'finished', 'save']:
×
147
            self.states[state_name] = QtCore.QState(self.machine)
×
148
        self.machine.setInitialState(self.states['start'])
×
149

150
        # state 'start': welcome the user and explain what's going on --------------------------------------------------
151
        self.states['start'].assignProperty(self.labelGuideHead, 'text', 'Welcome')
×
152
        self.states['start'].assignProperty(
×
153
            self.labelGuideText,
154
            'text',
155
            'This is a step-by-step guide for calibrating the valve of your rig. You can abort the process at any time by '
156
            'pressing Cancel or closing this window.',
157
        )
NEW
158
        self.states['start'].assignProperty(self.commandLinkNext, 'enabled', True)
×
159
        self.states['start'].addTransition(self.commandLinkNext.clicked, self.states['beaker'])
×
160

161
        # state 'beaker': ask user to position beaker on scale ---------------------------------------------------------
162
        self.states['beaker'].assignProperty(self.labelGuideHead, 'text', 'Preparation')
×
163
        self.states['beaker'].assignProperty(
×
164
            self.labelGuideText,
165
            'text',
166
            'Fill the water reservoir to the level used during experiments.\n\n'
167
            'Place a small beaker on the scale and position the lick spout directly above.\n\n'
168
            'Make sure that neither lick spout nor tubing touch the beaker or the scale and that water drops can freely fall '
169
            'into the beaker.',
170
        )
171
        self.states['beaker'].entered.connect(self.clear_calibration)
×
172
        self.states['beaker'].assignProperty(self.pushButtonRestart, 'visible', False)
×
173
        self.states['beaker'].assignProperty(self.commandLinkNext, 'visible', True)
×
174
        self.states['beaker'].assignProperty(self.commandLinkNext, 'enabled', True)
×
175
        self.states['beaker'].assignProperty(self.pushButtonSave, 'enabled', False)
×
NEW
176
        self.states['beaker'].assignProperty(self.pushButtonTareScale, 'enabled', use_scale)
×
177
        self.states['beaker'].assignProperty(self.pushButtonToggleValve, 'enabled', True)
×
178
        self.states['beaker'].assignProperty(self.pushButtonPulseValve, 'enabled', True)
×
179
        self.states['beaker'].addTransition(self.commandLinkNext.clicked, self.states['flow'])
×
180

181
        # state 'flow': prepare flow of water --------------------------------------------------------------------------
182
        self.states['flow'].assignProperty(self.labelGuideHead, 'text', 'Preparation')
×
183
        self.states['flow'].assignProperty(
×
184
            self.labelGuideText,
185
            'text',
186
            'Use the valve controls above to advance the flow of the water until there are no visible pockets of air within the '
187
            'tubing and first drops start falling into the beaker.',
188
        )
189
        self.states['flow'].addTransition(self.commandLinkNext.clicked, self.states['clear'])
×
190

191
        # state 'clear': try to clear one drop of water to set a defined start point for calibration -------------------
192
        self.states['clear'].entered.connect(self.clear_drop)
×
193
        self.states['clear'].assignProperty(self.pushButtonTareScale, 'enabled', False)
×
194
        self.states['clear'].assignProperty(self.pushButtonToggleValve, 'enabled', False)
×
NEW
195
        if use_scale:
×
196
            self.states['clear'].assignProperty(self.pushButtonPulseValve, 'enabled', False)
×
197
            self.states['clear'].assignProperty(self.commandLinkNext, 'enabled', False)
×
198
            self.states['clear'].addTransition(self.drop_cleared, self.states['tare'])
×
199
        else:
NEW
200
            self.states['clear'].assignProperty(self.pushButtonPulseValve, 'enabled', True)
×
NEW
201
            self.states['clear'].assignProperty(self.commandLinkNext, 'enabled', True)
×
NEW
202
            self.states['clear'].addTransition(self.commandLinkNext.clicked, self.states['tare'])
×
203

204
        # state 'tare': tare the scale ---------------------------------------------------------------------------------
205
        self.states['tare'].assignProperty(self.pushButtonPulseValve, 'enabled', False)
×
NEW
206
        if use_scale:
×
NEW
207
            self.states['tare'].entered.connect(self.tare)
×
NEW
208
            self.states['tare'].addTransition(self.tared, self.states['calibrate'])
×
209
        else:
NEW
210
            self.states['tare'].assignProperty(self.labelGuideText, 'text', 'Tare the scale.')
×
NEW
211
            self.states['tare'].addTransition(self.commandLinkNext.clicked, self.states['calibrate'])
×
212

213
        # state 'calibrate': perform the actual measurement ------------------------------------------------------------
214
        self.states['calibrate'].entered.connect(self.calibrate)
×
NEW
215
        self.states['calibrate'].assignProperty(self.commandLinkNext, 'enabled', False)
×
216
        self.states['calibrate'].addTransition(self.start_next_calibration, self.states['clear'])
×
217
        self.states['calibrate'].addTransition(self.calibration_finished, self.states['finished'])
×
218

219
        # state 'finished': ask user to save or discard the calibration ------------------------------------------------
220
        self.states['finished'].assignProperty(self.labelGuideHead, 'text', 'Calibration is finished')
×
221
        self.states['finished'].assignProperty(
×
222
            self.labelGuideText,
223
            'text',
224
            'Click Save to store the calibration. Close this window or click Cancel to discard the calibration.',
225
        )
226
        self.states['finished'].assignProperty(self.commandLinkNext, 'visible', False)
×
227
        self.states['finished'].assignProperty(self.pushButtonSave, 'enabled', True)
×
228
        self.states['finished'].assignProperty(self.pushButtonRestart, 'visible', True)
×
229
        self.states['finished'].addTransition(self.pushButtonRestart.clicked, self.states['beaker'])
×
230
        self.states['finished'].addTransition(self.pushButtonSave.clicked, self.states['save'])
×
231

232
        # state 'save': save calibration and quit ----------------------------------------------------------------------
233
        self.states['save'].entered.connect(self.save)
×
234
        self.states['save'].assignProperty(self, 'enabled', False)
×
235

236
        self.machine.start()
×
237

238
    def clear_calibration(self):
2✔
239
        self.new_calibration.clear()
×
240
        self._next_calibration_time = self.get_next_calibration_time()
×
241

242
    def get_next_calibration_time(self) -> float | None:
2✔
243
        remaining_calibration_times = [
×
244
            t for t in self.valve.new_calibration_open_times if t not in self.new_calibration.values.open_times_ms
245
        ]
246
        if len(remaining_calibration_times) > 0:
×
247
            return max(remaining_calibration_times)
×
248
        else:
249
            return None
×
250

251
    def initialize_scale(self, port: str) -> bool:
2✔
252
        if port is None:
×
253
            self.groupBoxScale.setVisible(False)
×
NEW
254
            self.define_and_start_state_machine(use_scale=False)
×
255
            return False
×
256
        try:
×
257
            self.lineEditGrams.setAlignment(QtCore.Qt.AlignCenter)
×
258
            self.lineEditGrams.setText('Starting')
×
259
            self.scale = Scale(port)
×
260
            return True
×
261
        except (AssertionError, SerialException):
×
262
            log.error(f'Error initializing OHAUS scale on {port}.')
×
263
            return False
×
264

265
    def _on_initialize_scale_result(self, success: bool):
2✔
266
        if success:
×
267
            self.lineEditGrams.setEnabled(True)
×
268
            self.pushButtonTareScale.setEnabled(True)
×
269
            self.lineEditGrams.setAlignment(QtCore.Qt.AlignRight)
×
270
            self.lineEditGrams.setText('')
×
271
            self.scale_timer.timeout.connect(self.get_scale_reading)
×
272
            self.action_grams.setVisible(True)
×
273
            self.get_scale_reading()
×
274
            self.scale_timer.start(self._scale_update_ms)
×
275
        else:
276
            self.lineEditGrams.setAlignment(QtCore.Qt.AlignCenter)
×
277
            self.lineEditGrams.setText('Error')
×
NEW
278
        self.scale_initialized.emit(success)
×
279

280
    def get_scale_reading(self):
2✔
281
        grams, stable = self.scale.get_grams()
×
282
        if grams != self._grams:
×
283
            self.scale_text_changed.emit(f'{grams:0.2f}')
×
284
        if stable != self._stable:
×
285
            self.scale_stable_changed.emit(stable)
×
286
        self._grams = grams
×
287
        self._stable = stable
×
288

289
    @QtCore.pyqtSlot(str)
2✔
290
    def display_scale_text(self, value: str):
2✔
291
        self.lineEditGrams.setText(value)
×
292

293
    @QtCore.pyqtSlot(bool)
2✔
294
    def display_scale_stable(self, value: bool):
2✔
295
        self.action_stable.setVisible(value)
×
296

297
    def toggle_valve(self):
2✔
298
        state = self.pushButtonToggleValve.isChecked()
×
299
        self.pushButtonToggleValve.setStyleSheet('QPushButton {background-color: rgb(128, 128, 255);}' if state else '')
×
300
        try:
×
301
            self.bpod.open_valve(open=state)
×
302
        except (OSError, BpodErrorException):
×
303
            self.pushButtonToggleValve.setChecked(False)
×
304
            self.pushButtonToggleValve.setStyleSheet('')
×
305

306
    def pulse_valve(self):
2✔
307
        self.bpod.pulse_valve(0.05)
×
308

309
    def clear_drop(self):
2✔
310
        self.labelGuideHead.setText('Calibration')
×
311
        if self.scale is None:
×
312
            self.labelGuideText.setText(
×
313
                "Use the 'Pulse Valve' button above to clear one drop of water in order to obtain a defined starting point for "
314
                'calibration.'
315
            )
316
        else:
317
            self.labelGuideText.setText(
×
318
                'Trying to automatically clear one drop of water to obtain a defined starting point for calibration.'
319
            )
320
            initial_grams = self.scale.get_stable_grams()
×
321
            self._clear_drop_counter = 0
×
322
            timer_callback = functools.partial(self.clear_crop_callback, initial_grams)
×
323
            self.clear_timer.timeout.connect(timer_callback)
×
324
            self.clear_timer.start(500)
×
325

326
    def clear_crop_callback(self, initial_grams: float, duration_s: float = 0.05):
2✔
327
        if self.scale.get_grams()[0] > initial_grams + 0.02:
×
328
            self.clear_timer.stop()
×
329
            self.drop_cleared.emit(self._clear_drop_counter)
×
330
            return
×
331
        self._clear_drop_counter += 1
×
332
        self.bpod.pulse_valve(duration_s)
×
333

334
    def tare(self):
2✔
335
        self.scale_timer.stop()
×
336
        self.scale_text_changed.emit('------')
×
337
        self._grams = float('nan')
×
338
        worker = Worker(self.scale.tare)
×
339
        worker.signals.result.connect(self._on_tare_finished)
×
340
        QThreadPool.globalInstance().tryStart(worker)
×
341

342
    @QtCore.pyqtSlot(object)
2✔
343
    def _on_tare_finished(self, success: bool):
2✔
344
        self.scale_timer.start(self._scale_update_ms)
×
345
        self.tared.emit(success)
×
346

347
    @QtCore.pyqtSlot()
2✔
348
    def calibrate(self):
2✔
349
        n_samples = int(np.ceil(50 * max(self.valve.new_calibration_open_times) / self._next_calibration_time))
×
350
        self.labelGuideText.setText(
×
351
            f'Getting {n_samples} samples for a valve opening time of {self._next_calibration_time} ms ...'
352
        )
353
        worker = Worker(self.bpod.pulse_valve_repeatedly, n_samples, self._next_calibration_time / 1e3, 0.2)
×
354
        worker.signals.result.connect(self._on_repeated_pulse_finished)
×
355
        QThreadPool.globalInstance().tryStart(worker)
×
356

357
    @QtCore.pyqtSlot(object)
2✔
358
    def _on_repeated_pulse_finished(self, n_pulses: int):
2✔
359
        if self.scale is None:
×
360
            ok = False
×
361
            scale_reading = 0
×
362
            while not ok or scale_reading <= 0:
×
363
                scale_reading, ok = QtWidgets.QInputDialog().getDouble(
×
364
                    self,
365
                    'Enter Scale Reading',
366
                    'Enter measured weight in grams:',
367
                    min=0,
368
                    max=float('inf'),
369
                    decimals=2,
370
                    flags=(QtWidgets.QInputDialog().windowFlags() & ~QtCore.Qt.WindowType.WindowContextHelpButtonHint),
371
                )
372
            grams_per_pulse = scale_reading / n_pulses
×
373
        else:
374
            scale_reading = self.scale.get_stable_grams()
×
375
            grams_per_pulse = scale_reading / n_pulses
×
376
        self.new_calibration.values.add_samples([self._next_calibration_time], [grams_per_pulse])
×
377
        self.new_calibration.update()
×
378
        self._next_calibration_time = self.get_next_calibration_time()
×
379
        if self._next_calibration_time is None:
×
380
            self.calibration_finished.emit()
×
381
        else:
382
            self.start_next_calibration.emit()
×
383

384
    def save(self) -> None:
2✔
385
        valve_settings = self.hw_settings.device_valve
×
386
        valve_settings.WATER_CALIBRATION_OPEN_TIMES = [float(x) for x in self.new_calibration.values.open_times_ms]
×
387
        valve_settings.WATER_CALIBRATION_WEIGHT_PERDROP = [float(x) for x in self.new_calibration.values.volumes_ul]
×
388
        valve_settings.WATER_CALIBRATION_DATE = date.today()
×
389
        self.parent().model.hardware_settings.device_valve = valve_settings
×
390
        save_pydantic_yaml(self.parent().model.hardware_settings)
×
391
        self.labelGuideHead.setText('Settings saved.')
×
392
        self.labelGuideText.setText('')
×
NEW
393
        QtCore.QTimer.singleShot(1000, self.close)
×
394

395
    @override
2✔
396
    def closeEvent(self, event):
2✔
397
        self.clear_timer.stop()
×
398
        self.scale_timer.stop()
×
399
        if self.machine.started:
×
400
            self.machine.stop()
×
401
        self.bpod.stop_trial()
×
402
        self.deleteLater()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc