• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 14217451843

02 Apr 2025 10:30AM UTC coverage: 47.508% (+0.7%) from 46.79%
14217451843

Pull #779

github

78b296
web-flow
Merge bac5de33d into 645e4a2a0
Pull Request #779: Video

9 of 94 new or added lines in 2 files covered. (9.57%)

1096 existing lines in 22 files now uncovered.

4308 of 9068 relevant lines covered (47.51%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.89
/iblrig/video_pyspin.py
1
import functools
2✔
2
import logging
2✔
3
import time
2✔
4
from collections.abc import Callable
2✔
5
from typing import Any
2✔
6

7
import PySpin
2✔
8

9
logger = logging.getLogger(__name__)
2✔
10

11

12
def camera_log(level: int, camera: PySpin.CameraPtr, message: str, stacklevel: int = 2) -> bool:
2✔
13
    """
14
    Log a message related to a camera.
15

16
    Parameters
17
    ----------
18
    level : int
19
        The logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL).
20
    camera : PySpin.CameraPtr
21
        A pointer to the camera object from the PySpin library.
22
    message : str
23
        The message to log, which will be associated with the camera.
24
    stacklevel : int, optional
25
        The stack level to use for the logging call (default is 2).
26

27
    Returns
28
    -------
29
    bool
30
        Returns True if the logging level is less than ERROR, otherwise False.
31
    """
32
    logger.log(level=level, msg=f'Camera #{camera.DeviceID()}: {message.strip(" .")}.', stacklevel=stacklevel)
×
33
    return level < logging.ERROR
×
34

35

36
class Cameras:
2✔
37
    """A class to manage camera instances using the PySpin library.
38

39
    This class provides a context manager for initializing and deinitializing cameras. It ensures that cameras are
40
    properly initialized when entering the context and deinitialized when exiting.
41
    """
42

43
    _instance = None
2✔
44

45
    def __init__(self, init_cameras: bool = True):
2✔
46
        """Initializes the Cameras instance.
47

48
        Parameters
49
        ----------
50
        init_cameras : bool, optional
51
            If True, initializes the cameras upon creation of the instance (default is True).
52
        """
53
        self._instance = PySpin.System.GetInstance()
×
54
        self._cameras = self._instance.GetCameras()
×
55
        self._init_cameras = init_cameras
×
56
        if init_cameras:
×
57
            for i in range(len(self._cameras)):
×
58
                self._cameras[i].Init()
×
59

60
    def __enter__(self) -> PySpin.CameraList:
2✔
61
        """Enters the runtime context related to this object.
62

63
        Returns
64
        -------
65
        PySpin.CameraList
66
            The list of initialized cameras.
67
        """
68
        return self._cameras
×
69

70
    def __exit__(self, exc_type, exc_value, traceback):
2✔
71
        """Exits the runtime context related to this object.
72

73
        Deinitializes the cameras if they were initialized and releases the system instance.
74
        """
75
        if self._init_cameras:
×
76
            for i in range(len(self._cameras)):
×
77
                self._cameras[i].DeInit()
×
78
        self._cameras.Clear()
×
NEW
79
        self._instance.ReleaseInstance()
×
80

81

82
def process_camera(func: Callable[..., Any]) -> Callable[..., tuple[Any, ...]]:
2✔
83
    """Decorator to process a camera or a list of cameras.
84

85
    This decorator allows a function to accept a single camera instance, a list of camera instances, or None. If None
86
    is provided, the decorator will iterate over all available cameras managed by the Cameras context manager and call
87
    the decorated function for each camera.
88

89
    Parameters
90
    ----------
91
    func : Callable
92
        The function to be decorated, which will be called with each camera instance.
93

94
    Returns
95
    -------
96
    Callable
97
        The wrapped function that processes the camera input.
98
    """
99

100
    @functools.wraps(func)
2✔
101
    def wrapper(*args, **kwargs) -> tuple[Any, ...]:
2✔
102
        # find camera parameter
103
        if 'camera' in kwargs:
×
104
            camera = kwargs.pop('camera')
×
105
        elif len(args) > 0 and isinstance(args[-1], PySpin.CameraPtr | PySpin.CameraList):
×
NEW
106
            camera = args[-1]
×
107
            args = args[:-1]
×
108
        else:
109
            camera = None
×
110

111
        # call the wrapped function
112
        results = []
×
113
        if isinstance(camera, PySpin.CameraPtr):
×
NEW
114
            results.append(func(*args, camera=camera, **kwargs))
×
NEW
115
        elif isinstance(camera, PySpin.CameraList):
×
NEW
116
            for i in range(len(camera)):
×
NEW
117
                results.append(func(*args, camera=camera[i], **kwargs))
×
NEW
118
        if camera is None:
×
119
            with Cameras() as camera_list:
×
120
                for i in range(len(camera_list)):
×
121
                    results.append(func(*args, camera=camera_list[i], **kwargs))
×
122

123
        # return results as tuple
124
        return tuple(results)
×
125

126
    return wrapper  # type: ignore
2✔
127

128

129
def acquisition_ok() -> bool:
2✔
130
    """Test image acquisition for all available cameras.
131

132
    This function attempts to acquire an image from each camera and checks if the acquisition was successful. It logs
133
    the results of the acquisition test for each camera.
134

135
    Returns
136
    -------
137
    bool
138
        True if all cameras successfully acquired an image, False otherwise.
139
    """
140
    success = True
×
141
    with Cameras() as cameras:
×
142
        for i in range(len(cameras)):
×
143
            camera_log(logging.DEBUG, cameras[i], 'Testing image acquisition')
×
144
            try:
×
145
                cameras[i].BeginAcquisition()
×
146
                image = cameras[i].GetNextImage(1000)
×
147
                if image.IsValid() and image.GetImageStatus() == PySpin.SPINNAKER_IMAGE_STATUS_NO_ERROR:
×
148
                    camera_log(logging.INFO, cameras[i], 'Acquisition test was successful')
×
149
                else:
150
                    camera_log(logging.ERROR, cameras[i], 'Acquisition test failed')
×
151
                    success = False
×
152
            except Exception as e:
×
153
                camera_log(logging.ERROR, cameras[i], f'Acquisition test failed: {e.args[0]}')
×
154
                success = False
×
155
            else:
156
                if image.IsValid():
×
157
                    image.Release()
×
158
            finally:
159
                cameras[i].EndAcquisition()
×
160
    return success
×
161

162

163
def reset_all_cameras():
2✔
164
    """Reset all available cameras and wait for them to come back online.
165

166
    This function initializes each camera, attempts to reset it, and then deinitializes it.
167
    After resetting, it waits for all cameras to come back online, logging the status of each camera.
168
    """
169
    with Cameras(init_cameras=False) as cameras:
×
170
        if len(cameras) == 0:
×
171
            return
×
172

173
        # Iterate through each camera and reset
174
        for i in range(len(cameras)):
×
175
            cameras[i].Init()
×
176
            try:
×
177
                cameras[i].DeviceReset()
×
178
            except PySpin.SpinnakerException as e:
×
179
                camera_log(logging.ERROR, cameras[i], f'Error resetting camera: {e}')
×
180
            else:
181
                camera_log(logging.INFO, cameras[i], 'Resetting camera')
×
182
            finally:
183
                cameras[i].DeInit()
×
184

185
        # Wait for all cameras to come back online
186
        for i in range(len(cameras)):
×
NEW
187
            camera_log(logging.INFO, cameras[i], 'Waiting for camera to come back online (~10 s)')
×
NEW
188
        all_cameras_online = False
×
189
        while not all_cameras_online:
×
190
            all_cameras_online = True
×
191
            for i in range(len(cameras)):
×
192
                try:
×
193
                    cameras[i].Init()
×
194
                except PySpin.SpinnakerException:
×
195
                    all_cameras_online = False
×
196
                else:
197
                    camera_log(logging.INFO, cameras[i], 'Camera is back online')
×
NEW
198
                    cameras[i].DeInit()
×
199
            if not all_cameras_online:
×
200
                time.sleep(0.2)
×
201

202

203
@process_camera
2✔
204
def set_value(node_name: str, value: Any, camera: PySpin.CameraPtr) -> bool:
2✔
205
    """
206
    Set the value of a camera node to a specified value.
207

208
    Parameters
209
    ----------
210
    node_name : str
211
        The name of the node to set the value for.
212
    value : Any
213
        The value to set for the specified node. The type of value must match the node's expected type.
214
    camera : PySpin.CameraPtr, PySpin.CameraList or None, optional
215
        A pointer to a specific camera instance, a list of instances, or None. If None is specified, all available
216
        cameras will be considered.
217

218
    Returns
219
    -------
220
    bool
221
        True if the property was set successfully, False otherwise.
222
    """
NEW
223
    try:
×
224
        # get node
NEW
225
        assert hasattr(camera, node_name), f"No such node: '{node_name}'"
×
NEW
226
        node = getattr(camera, node_name)
×
NEW
227
        assert hasattr(node, 'SetValue'), f"node '{node_name}' has no SetValue() attribute"
×
NEW
228
        disp_name = node.GetDisplayName()
×
NEW
229
        assert PySpin.IsWritable(node), f'{disp_name} is not writable'
×
230

231
        # assert types
NEW
232
        node_type = type(node)
×
NEW
233
        val_type = type(value)
×
NEW
234
        match node_type:
×
NEW
235
            case PySpin.IInteger:
×
NEW
236
                expected_value_types = [int]
×
NEW
237
            case PySpin.IFloat:
×
NEW
238
                expected_value_types = [int, float]
×
NEW
239
            case PySpin.IBoolean:
×
NEW
240
                expected_value_types = [bool]
×
NEW
241
            case _ if isinstance(node, PySpin.IEnumeration):
×
NEW
242
                expected_value_types = [int, str]
×
NEW
243
                if val_type is str:
×
NEW
244
                    if hasattr(PySpin, enumeration_name := f'{node_name}_{value}'):
×
NEW
245
                        value = getattr(PySpin, enumeration_name)
×
NEW
246
                        val_type = type(value)
×
247
                    else:
NEW
248
                        expected_val_str = ', '.join([f"'{n.GetName().rsplit('_', 1)[-1]}'" for n in node.GetEntries()])
×
NEW
249
                        expected_val_str = ' or'.join(expected_val_str.rsplit(',', 1))
×
NEW
250
                        raise ValueError(f'String value for {disp_name} must be {expected_val_str}')
×
NEW
251
            case _:
×
NEW
252
                raise TypeError(f'Unsupported node type: {node_type.__name__}')
×
NEW
253
        if val_type not in expected_value_types:
×
NEW
254
            expected_val_type_str = ', '.join([f'{x.__name__}' for x in expected_value_types])
×
NEW
255
            expected_val_type_str = ' or'.join(expected_val_type_str.rsplit(',', 1))
×
NEW
256
            raise TypeError(f'Value for {disp_name} must be of type {expected_val_type_str} - not {val_type.__name__}')
×
257

258
        # limit value to valid range
NEW
259
        if node_type in (PySpin.IInteger, PySpin.IFloat) and not node.GetMin() <= value <= node.GetMax():
×
NEW
260
            value = min(max(value, node.GetMin()), node.GetMax())
×
261

262
        # set value (if necessary)
NEW
263
        if isinstance(node, PySpin.IEnumeration) and value != node.GetIntValue():
×
NEW
264
            value_str = node.GetEntry(value).GetDisplayName()
×
NEW
265
        elif value != node.GetValue():
×
NEW
266
            value_str = f'{value:g}{" " + node.GetUnit() if hasattr(node, "GetUnit") else ""}'
×
267
        else:
NEW
268
            return True
×
NEW
269
        node.SetValue(value)
×
NEW
270
        return camera_log(logging.INFO, camera, f'Setting {disp_name} to {value_str}')
×
NEW
271
    except Exception as e:
×
NEW
272
        return camera_log(logging.ERROR, camera, f'Error setting value: {e.args[0]}')
×
273

274

275
@process_camera
2✔
276
def get_value(node_name: str, camera: PySpin.CameraPtr) -> Any:
2✔
277
    """
278
    Get the value of a camera node.
279

280
    Parameters
281
    ----------
282
    node_name : str
283
        The name of the node to get the value of.
284
    camera : PySpin.CameraPtr, PySpin.CameraList or None, optional
285
        A pointer to a specific camera instance, a list of instances, or None. If None is specified, all available
286
        cameras will be considered.
287

288
    Returns
289
    -------
290
    Any
291
        The value of the node.
292
    """
NEW
293
    try:
×
NEW
294
        assert hasattr(camera, node_name), f"No such node: '{node_name}'"
×
NEW
295
        node = getattr(camera, node_name)
×
NEW
296
        assert hasattr(node, 'GetValue'), f"node '{node_name}' has no GetValue() attribute"
×
NEW
297
        disp_name = node.GetDisplayName()
×
NEW
298
        assert PySpin.IsReadable(node), f'{disp_name} is not readable'
×
NEW
299
        return node.GetValue()
×
NEW
300
    except Exception as e:
×
NEW
301
        return camera_log(logging.ERROR, camera, f'Error getting value: {e.args[0]}')
×
302

303

304
@process_camera
2✔
305
def get_string_value(node_name: str, camera: PySpin.CameraPtr) -> str:
2✔
306
    """
307
    Get the value of a camera node, formatted as a string.
308

309
    Parameters
310
    ----------
311
    node_name : str
312
        The name of the node to get the value of.
313
    camera : PySpin.CameraPtr, PySpin.CameraList or None, optional
314
        A pointer to a specific camera instance, a list of instances, or None. If None is specified, all available
315
        cameras will be considered.
316

317
    Returns
318
    -------
319
    str
320
        The value of the node, formatted as a string.
321
    """
NEW
322
    try:
×
NEW
323
        assert hasattr(camera, node_name), f"No such node: '{node_name}'"
×
NEW
324
        node = getattr(camera, node_name)
×
NEW
325
        assert hasattr(node, 'GetValue'), f"node '{node_name}' has no GetValue() attribute"
×
NEW
326
        disp_name = node.GetDisplayName()
×
NEW
327
        assert PySpin.IsReadable(node), f'{disp_name} is not readable'
×
NEW
328
        if isinstance(node, PySpin.IEnumeration):
×
NEW
329
            return node.GetEntry(node.GetIntValue()).GetDisplayName()
×
330
        else:
NEW
331
            return f'{node.GetValue():g}{" " + node.GetUnit() if hasattr(node, "GetUnit") else ""}'
×
NEW
332
    except Exception as e:
×
NEW
333
        camera_log(logging.ERROR, camera, f'Error getting value: {e.args[0]}')
×
NEW
334
        return ''
×
335

336

337
@process_camera
2✔
338
def enable_camera_trigger(enable: bool, camera: PySpin.CameraPtr) -> bool:
2✔
339
    """Enable or disable the trigger for a specified camera or all cameras.
340

341
    This function allows you to enable or disable the trigger mode for a given camera / given cameras.
342
    If no camera is specified, it will enable or disable the trigger mode for all available cameras.
343

344
    Parameters
345
    ----------
346
    enable : bool
347
        A flag indicating whether to enable (True) or disable (False) the camera trigger.
348
    camera : PySpin.CameraPtr, PySpin.CameraList or None, optional
349
        A pointer to a specific camera instance, a list of instances, or None. If None is specified, all available
350
        cameras will be considered.
351

352
    Raises
353
    ------
354
    PySpin.SpinnakerException
355
        If there is an error while setting the trigger mode for the camera.
356
    """
357
    return set_value(node_name='TriggerMode', value=int(enable), camera=camera)
×
358

359

360
@process_camera
2✔
361
def select_line(line: int | str, camera: PySpin.CameraPtr) -> bool:
2✔
362
    return set_value(node_name='LineSelector', value=line, camera=camera)
×
363

364

365
@process_camera
2✔
366
def set_line_mode(value: int | str, camera: PySpin.CameraPtr) -> bool:
2✔
367
    return set_value(node_name='LineMode', value=value, camera=camera)
×
368

369

370
@process_camera
2✔
371
def set_line_source(value: int | str, camera: PySpin.CameraPtr) -> bool:
2✔
NEW
372
    return set_value(node_name='LineSource', value=value, camera=camera)
×
373

374

375
@process_camera
2✔
376
def set_framerate(value: float, camera: PySpin.CameraPtr) -> bool:
2✔
NEW
377
    return set_value(node_name='AcquisitionFrameRate', value=value, camera=camera)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc