• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SciKit-Surgery / scikit-surgerynditracker / 1

06 Jun 2024 09:16AM UTC coverage: 14.089% (-83.8%) from 97.938%
1

push

github

web-flow
Merge pull request #86 from SciKit-Surgery/85-update-python

85 update python

0 of 3 new or added lines in 1 file covered. (0.0%)

245 existing lines in 2 files now uncovered.

41 of 291 relevant lines covered (14.09%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.33
/sksurgerynditracker/nditracker.py
1
#  -*- coding: utf-8 -*-
2

3
"""Class implementing communication with NDI (Northern Digital) trackers"""
4

5
import sys
5✔
6
import os
5✔
7
import contextlib
5✔
8

9
from platform import system
5✔
10
from subprocess import call
5✔
11
from time import time
5✔
12
from serial.tools import list_ports #pylint: disable=import-error
5✔
13

14
from six import int2byte
5✔
15
from numpy import full, nan, reshape
5✔
16
from sksurgerycore.baseclasses.tracker import SKSBaseTracker
5✔
17
import ndicapy
5✔
18
from sksurgerynditracker.serial_utils.com_ports import \
5✔
19
        fix_com_port_greater_than_9
20

21
@contextlib.contextmanager
5✔
22
def _open_logging(verbose):
5✔
23
    """
24
    Opens either stdout out if verbose is true,
25
    else os.devnull if verbose is false
26
    """
UNCOV
27
    if verbose:
×
UNCOV
28
        fileout = sys.stdout
×
29
    else:
UNCOV
30
        fileout = open(os.devnull, 'w', encoding = 'utf-8') #pylint: disable=consider-using-with
×
31

UNCOV
32
    try:
×
UNCOV
33
        yield fileout
×
34

35
    finally:
UNCOV
36
        if fileout is not sys.stdout:
×
UNCOV
37
            fileout.close()
×
38

39
def _get_serial_port_name(configuration):
5✔
40
    """
41
    Probes the system's serial ports to
42
    find the name of the serial port and check we can connect
43

44
    :return port_name: the name of the port
45

46
    :raises: IOError if port not found or port probe fails
47
    """
48

UNCOV
49
    serial_connection_errmsg = """
×
50
    Please check the following:\n
51
    \t1) Is an NDI device connected to your computer?\n
52
    \t2) Is the NDI device switched on?\n
53
    \t3) Do you have sufficient privilege to connect to
54
    the device? (e.g. on Linux are you part of the "dialout"
55
    group?)'
56
    """
57

UNCOV
58
    with _open_logging(configuration.get('verbose', False)) as fileout:
×
UNCOV
59
        serial_port = configuration.get("serial port", None)
×
UNCOV
60
        ports_to_probe = configuration.get("ports to probe", 20)
×
UNCOV
61
        serial_ports = list_ports.comports()
×
UNCOV
62
        result = None
×
UNCOV
63
        name = None
×
NEW
64
        ports_to_probe = min(ports_to_probe, len(serial_ports))
×
65

UNCOV
66
        if serial_port is None:
×
UNCOV
67
            for port_no in range(ports_to_probe):
×
UNCOV
68
                name = serial_ports[port_no].device
×
UNCOV
69
                name = fix_com_port_greater_than_9(name)
×
70

UNCOV
71
                result = ndicapy.ndiProbe(name)
×
UNCOV
72
                print("Probing port: ", port_no, " got name: ", name,
×
73
                      " Result: ", result, file=fileout)
UNCOV
74
                if result == ndicapy.NDI_OKAY:
×
UNCOV
75
                    break
×
76
            else:
77
                # If we did not break from the for loop:
UNCOV
78
                raise IOError('Could not find any NDI device in '
×
79
                    f'{ports_to_probe} serial port candidates checked. '
80
                    + serial_connection_errmsg)
81

82
        else:
UNCOV
83
            if isinstance(serial_port, int):
×
UNCOV
84
                if serial_port < len(serial_ports):
×
UNCOV
85
                    name = serial_ports[serial_port].device
×
UNCOV
86
                    result = ndicapy.ndiProbe(name)
×
UNCOV
87
                    print("Probing port: ", serial_port, " got name: ", name,
×
88
                          " Result: ", result, file=fileout)
89
                else:
NEW
90
                    raise IOError(f'Could not connect to serial port'
×
91
                        f'{serial_port} as there are '
92
                        f'only {len(serial_ports)} ports available.'
93
                        + serial_connection_errmsg)
94

UNCOV
95
            if isinstance(serial_port, str):
×
UNCOV
96
                name = serial_port
×
UNCOV
97
                result = ndicapy.ndiProbe(name)
×
UNCOV
98
                print("Probing port: ", name,
×
99
                      " Result: ", result, file=fileout)
100

UNCOV
101
            if result != ndicapy.NDI_OKAY:
×
NEW
102
                raise IOError(f'Could not connect to an NDI device on '
×
103
                    f'the chosen port, {serial_port}.'
104
                    + serial_connection_errmsg)
UNCOV
105
        return name
×
106

107

108
class NDITracker(SKSBaseTracker):
5✔
109
    """
110
    Class for communication with NDI trackers.
111
    Should support Polaris, Aurora,
112
    and Vega. Currently only tested with wireless tools on Vega
113
    """
114
    def __init__(self, configuration):
5✔
115
        """
116
        Creates an NDI tracker devices and connects to an NDI Tracker.
117

118
        :param configuration: A dictionary containing details of the tracker.
119

120
            tracker type: vega polaris aurora dummy
121

122
            ip address:
123

124
            port:
125

126
            romfiles:
127

128
            serial port:
129

130
            ports to probe:
131

132
            use quaternions: default is false
133

134
            smoothing buffer: specify a buffer over which to average the
135
                tracking, defaults to 1
136

137
        :raises Exception: IOError, KeyError, OSError
138
        """
UNCOV
139
        self._device = None
×
UNCOV
140
        self._tool_descriptors = []
×
UNCOV
141
        self._tracker_type = None
×
UNCOV
142
        self._state = None
×
143

UNCOV
144
        self._get_frame = None
×
UNCOV
145
        self._get_transform = None
×
UNCOV
146
        self._capture_string = None
×
147

UNCOV
148
        self._configure(configuration)
×
149

UNCOV
150
        super().__init__(configuration, tracked_objects = None)
×
151

UNCOV
152
        if self._tracker_type == "vega":
×
UNCOV
153
            self._connect_vega(configuration)
×
154

UNCOV
155
        if self._tracker_type == "aurora":
×
UNCOV
156
            self._connect_aurora(configuration)
×
157

UNCOV
158
        if self._tracker_type == "polaris":
×
UNCOV
159
            self._connect_polaris(configuration)
×
160

UNCOV
161
        if self._tracker_type == "dummy":
×
UNCOV
162
            self._device = True
×
163

UNCOV
164
        self._initialise_ports()
×
UNCOV
165
        self._enable_tools()
×
UNCOV
166
        self._get_firmware_version()
×
UNCOV
167
        self._set_use_bx_transforms()
×
UNCOV
168
        self._state = 'ready'
×
169

170
    def _set_use_bx_transforms(self):
5✔
171
        """
172
        We'd like to use BX transforms as this sends binary
173
        tracking data, so should be faster, however for
174
        certain devices we can't do this. Here we check the
175
        firmware version and set _use_bx_transforms to suit.
176
        """
UNCOV
177
        self._get_frame = getattr(ndicapy, 'ndiGetBXFrame')
×
UNCOV
178
        self._get_transform = getattr(ndicapy, 'ndiGetBXTransform')
×
UNCOV
179
        self._capture_string = 'BX:0801'
×
180

UNCOV
181
        firmware = self._get_firmware_version()
×
UNCOV
182
        if firmware in (' AURORA Rev 007', ' AURORA Rev 008',
×
183
                        ' Polaris Vega 008',
184
                        ' Polaris Spectra Rev 006', ' Polaris Spectra Rev 007'):
UNCOV
185
            self._get_frame = getattr(ndicapy, 'ndiGetTXFrame')
×
186
            self._get_transform = getattr(ndicapy, 'ndiGetTXTransform')
×
187
            self._capture_string = 'TX:0801'
×
188

189
    def _get_firmware_version(self):
5✔
190
        """
191
        Gets the device's firmware version, and sets
192
        self._device_firmware_version
193
        """
194

UNCOV
195
        device_firmware_version = 'unknown 00.0'
×
196

UNCOV
197
        if self._tracker_type != 'dummy':
×
UNCOV
198
            device_info = ndicapy.ndiVER(self._device, 0).split('\n')
×
UNCOV
199
            for line in device_info:
×
UNCOV
200
                if line.startswith('Freeze Tag:'):
×
UNCOV
201
                    device_firmware_version = line.split(':')[1]
×
UNCOV
202
        return device_firmware_version
×
203

204
    def _connect_vega(self, configuration):
5✔
UNCOV
205
        self._connect_network(configuration)
×
UNCOV
206
        self._read_sroms_from_file()
×
207

208
    def _connect_polaris(self, configuration):
5✔
UNCOV
209
        name = _get_serial_port_name(configuration)
×
UNCOV
210
        self._connect_serial(name)
×
UNCOV
211
        self._read_sroms_from_file()
×
212

213
    def _connect_aurora(self, configuration):
5✔
UNCOV
214
        name = _get_serial_port_name(configuration)
×
UNCOV
215
        self._connect_serial(name)
×
UNCOV
216
        self._find_wired_ports()
×
217

218
    def _connect_network(self, configuration):
5✔
219
        #try and ping first to save time with timeouts
UNCOV
220
        param = '-n' if system().lower() == 'windows' else '-c'
×
UNCOV
221
        ip_address = configuration.get("ip address")
×
UNCOV
222
        port = configuration.get("port")
×
UNCOV
223
        if call(['ping', param, '1', ip_address]) == 0:
×
UNCOV
224
            self._device = ndicapy.ndiOpenNetwork(ip_address, port)
×
225
        else:
UNCOV
226
            raise IOError(f'Could not find a device at {ip_address}')
×
UNCOV
227
        if not self._device:
×
UNCOV
228
            raise IOError('Could not connect to network NDI device'
×
229
                          f'at {ip_address}')
230

UNCOV
231
        ndicapy.ndiCommand(self._device, 'INIT:')
×
UNCOV
232
        self._check_for_errors('Sending INIT command')
×
233

234

235
    def _connect_serial(self, name):
5✔
236
        """
237
        Attempts to open the serial port with name name
238

239
        :raises: IOError if connection fails
240
        """
UNCOV
241
        self._device = ndicapy.ndiOpen(name)
×
UNCOV
242
        if not self._device:
×
UNCOV
243
            raise IOError(f'Could not connect to serial NDI device at {name}')
×
244

UNCOV
245
        ndicapy.ndiCommand(self._device, 'INIT:')
×
UNCOV
246
        self._check_for_errors('Sending INIT command')
×
UNCOV
247
        ndicapy.ndiCommand(self._device,
×
248
                           f'COMM:{ndicapy.NDI_115200:d}{ndicapy.NDI_8N1:03d}'
249
                           f'{ndicapy.NDI_NOHANDSHAKE:d}')
250

251
    def _configure(self, configuration):
5✔
252
        """ Reads a configuration dictionary
253
        describing the tracker configuration.
254
        and sets class variables.
255

256
        raises: ValueError, KeyError
257
        """
UNCOV
258
        if not "tracker type" in configuration:
×
UNCOV
259
            raise KeyError("Configuration must contain 'Tracker type'")
×
260

UNCOV
261
        tracker_type = configuration.get("tracker type")
×
UNCOV
262
        if tracker_type in ("vega", "polaris", "aurora", "dummy"):
×
UNCOV
263
            self._tracker_type = tracker_type
×
264
        else:
UNCOV
265
            raise ValueError(
×
266
                "Supported trackers are 'vega', 'aurora', 'polaris', "
267
                "and 'dummy'")
268

UNCOV
269
        if self._tracker_type == "vega":
×
UNCOV
270
            self._check_config_vega(configuration)
×
271

UNCOV
272
        if self._tracker_type == "polaris":
×
UNCOV
273
            self._check_config_polaris(configuration)
×
274

UNCOV
275
        if self._tracker_type == "aurora":
×
UNCOV
276
            pass
×
277

UNCOV
278
        if self._tracker_type == "dummy":
×
UNCOV
279
            self._check_config_dummy(configuration)
×
280

281
    def _check_config_vega(self, configuration):
5✔
282
        """
283
        Internal function to check configuration of a polaris vega
284
        """
UNCOV
285
        if not "ip address" in configuration:
×
UNCOV
286
            raise KeyError("Configuration for vega must contain"
×
287
                           "'ip address'")
UNCOV
288
        if not "port" in configuration:
×
UNCOV
289
            configuration.update({"port":8765})
×
290

UNCOV
291
        if "romfiles" not in configuration:
×
UNCOV
292
            raise KeyError("Configuration for vega and polaris must"
×
293
                           "contain a list of 'romfiles'")
UNCOV
294
        for romfile in configuration.get("romfiles"):
×
UNCOV
295
            if not os.path.exists(romfile):
×
UNCOV
296
                raise FileNotFoundError(f"ROM file '{romfile}' not found.")
×
UNCOV
297
            self._tool_descriptors.append({"description" : romfile})
×
298

299
    def _check_config_polaris(self, configuration):
5✔
300
        """
301
        Internal function to check configuration for polaris vicra or spectra
302
        """
UNCOV
303
        if "romfiles" not in configuration:
×
UNCOV
304
            raise KeyError("Configuration for vega and polaris must"
×
305
                           "contain a list of 'romfiles'")
UNCOV
306
        for romfile in configuration.get("romfiles"):
×
UNCOV
307
            if not os.path.exists(romfile):
×
UNCOV
308
                raise FileNotFoundError(f"ROM file '{romfile}' not found.")
×
UNCOV
309
            self._tool_descriptors.append({"description" : romfile})
×
310

311
    def _check_config_dummy(self, configuration):
5✔
312
        """
313
        Internal function to check configuration of a testing dummy
314
        """
UNCOV
315
        if "romfiles" in configuration:
×
UNCOV
316
            for romfile in configuration.get("romfiles"):
×
UNCOV
317
                if not os.path.exists(romfile):
×
UNCOV
318
                    raise FileNotFoundError(f"ROM file '{romfile}' not found.")
×
UNCOV
319
                self._tool_descriptors.append({"description" : romfile})
×
320

321
    def close(self):
5✔
322
        """
323
        Closes the connection to the NDI Tracker and
324
        deletes the tracker device.
325

326
        :raises Exception: ValueError
327
        """
UNCOV
328
        if not self._device:
×
UNCOV
329
            raise ValueError('close called with no NDI device')
×
330

UNCOV
331
        if self._state == "tracking":
×
UNCOV
332
            self.stop_tracking()
×
333

UNCOV
334
        if self._tracker_type == "vega":
×
UNCOV
335
            ndicapy.ndiCloseNetwork(self._device)
×
336

UNCOV
337
        if self._tracker_type in ("aurora", "polaris"):
×
UNCOV
338
            ndicapy.ndiClose(self._device)
×
339

UNCOV
340
        self._device = None
×
UNCOV
341
        self._state = None
×
342

343
    def _read_sroms_from_file(self):
5✔
UNCOV
344
        if not self._device:
×
UNCOV
345
            raise ValueError('read srom called with no NDI device')
×
346

UNCOV
347
        if self._state == "tracking":
×
UNCOV
348
            self.stop_tracking()
×
349

350
        #free ports that are waiting to be freed
UNCOV
351
        ndicapy.ndiCommand(self._device, 'PHSR:01')
×
UNCOV
352
        number_of_tools = ndicapy.ndiGetPHSRNumberOfHandles(self._device)
×
UNCOV
353
        for tool_index in range(number_of_tools):
×
UNCOV
354
            port_handle = ndicapy.ndiGetPHSRHandle(self._device, tool_index)
×
UNCOV
355
            ndicapy.ndiCommand(self._device, f"PHF:{port_handle:02x}")
×
UNCOV
356
            self._check_for_errors(f'freeing port handle {tool_index:02x}.')
×
357

UNCOV
358
        for tool in self._tool_descriptors:
×
UNCOV
359
            ndicapy.ndiCommand(self._device, 'PHRQ:*********1****')
×
UNCOV
360
            port_handle = ndicapy.ndiGetPHRQHandle(self._device)
×
UNCOV
361
            tool.update({"port handle" : port_handle})
×
UNCOV
362
            if self._tracker_type == "aurora":
×
UNCOV
363
                tool.update({"c_str port handle" : str(port_handle).encode()})
×
364
            else:
UNCOV
365
                tool.update({"c_str port handle" : int2byte(port_handle)})
×
366

UNCOV
367
            self._check_for_errors(
×
368
                    f'getting srom file port handle {port_handle}.')
369

UNCOV
370
            ndicapy.ndiPVWRFromFile(self._device, port_handle,
×
371
                                    tool.get("description"))
UNCOV
372
            self._check_for_errors(
×
373
                    f'setting srom file port handle {port_handle}.')
374

UNCOV
375
        ndicapy.ndiCommand(self._device, 'PHSR:01')
×
376

377
    def _initialise_ports(self):
5✔
378
        """Initialises each port in the list of tool descriptors"""
UNCOV
379
        if not self._device:
×
UNCOV
380
            raise ValueError('init ports called with no NDI device')
×
381

UNCOV
382
        if not self._tracker_type == "dummy":
×
UNCOV
383
            ndicapy.ndiCommand(self._device, 'PHSR:02')
×
UNCOV
384
            for tool in self._tool_descriptors:
×
UNCOV
385
                ndicapy.ndiCommand(self._device,
×
386
                        f'PINIT:{tool.get("port handle"):02x}')
UNCOV
387
                self._check_for_errors('Initialising port handle '
×
388
                                       f'{tool.get("port handle"):02x}.')
389

390
    def _find_wired_ports(self):
5✔
391
        """For systems with wired tools, gets the number of tools plugged in
392
        and sticks them in the tool descriptors list"""
UNCOV
393
        if not self._device:
×
UNCOV
394
            raise ValueError('find wired ports called with no NDI device')
×
395

UNCOV
396
        ndicapy.ndiCommand(self._device, 'PHSR:02')
×
UNCOV
397
        number_of_tools = ndicapy.ndiGetPHSRNumberOfHandles(self._device)
×
UNCOV
398
        while number_of_tools > 0:
×
UNCOV
399
            for ndi_tool_index in range(number_of_tools):
×
UNCOV
400
                port_handle = ndicapy.ndiGetPHSRHandle(self._device,
×
401
                                                       ndi_tool_index)
402

UNCOV
403
                self._tool_descriptors.append({"description" : ndi_tool_index,
×
404
                                               "port handle" : port_handle,
405
                                               "c_str port handle" :
406
                                               int2byte(port_handle)})
UNCOV
407
                ndicapy.ndiCommand(self._device,
×
408
                                   f"PINIT:{port_handle:02x}")
UNCOV
409
            ndicapy.ndiCommand(self._device, 'PHSR:02')
×
UNCOV
410
            number_of_tools = ndicapy.ndiGetPHSRNumberOfHandles(self._device)
×
411

412
    def _enable_tools(self):
5✔
UNCOV
413
        if not self._device:
×
UNCOV
414
            raise ValueError('enable tools called with no NDI device')
×
415

UNCOV
416
        if not self._tracker_type == "dummy":
×
UNCOV
417
            ndicapy.ndiCommand(self._device, "PHSR:03")
×
UNCOV
418
            number_of_tools = ndicapy.ndiGetPHSRNumberOfHandles(self._device)
×
UNCOV
419
            for tool_index in range(number_of_tools):
×
UNCOV
420
                port_handle = ndicapy.ndiGetPHSRHandle(self._device, tool_index)
×
UNCOV
421
                port_handle_already_present = False
×
UNCOV
422
                for tool in self._tool_descriptors:
×
UNCOV
423
                    if tool.get("port handle") == port_handle:
×
UNCOV
424
                        port_handle_already_present = True
×
UNCOV
425
                        break
×
UNCOV
426
                if not port_handle_already_present:
×
UNCOV
427
                    self._tool_descriptors.append({
×
428
                        "description" : tool_index,
429
                        "port handle" : port_handle,
430
                        "c_str port handle" :
431
                        int2byte(port_handle)})
432

UNCOV
433
                mode = 'D'
×
UNCOV
434
                ndicapy.ndiCommand(self._device,
×
435
                        f"PENA:{port_handle:02x}{mode}")
UNCOV
436
                self._check_for_errors(f'Enabling port handle {port_handle}.')
×
437

438
    def get_frame(self):
5✔
439
        """Gets a frame of tracking data from the NDI device.
440

441
        :return:
442

443
            port_numbers : list of port handles, one per tool
444

445
            time_stamps : list of timestamps (cpu clock), one per tool
446

447
            frame_numbers : list of framenumbers (tracker clock) one per tool
448

449
            tracking : list of 4x4 tracking matrices, rotation and position,
450
            or if use_quaternions is true, a list of tracking quaternions,
451
            column 0-3 is the rotation as a quaternion (qw, qx, qy, qz),
452
            column 4-6 is the translation (x,y,z).
453

454
            tracking_quality : list the tracking quality, one per tool.
455

456
        Note: The time stamp is based on the host computer clock. Read the
457
        following extract from NDI's API Guide for advice on what to use:
458
        "Use the frame number, and not the host computer clock, to identify when
459
        data was collected. The frame number is incremented by 1 at a constant
460
        rate of 60 Hz. Associating a time from the host computer clock to
461
        replies from the system assumes that the duration of time between raw
462
        data collection and when the reply is received by the host computer is
463
        constant. This is not necessarily the case."
464
        """
UNCOV
465
        port_handles = []
×
UNCOV
466
        time_stamps = []
×
UNCOV
467
        frame_numbers = []
×
UNCOV
468
        tracking_rots = []
×
UNCOV
469
        tracking_trans = []
×
UNCOV
470
        tracking_quality = []
×
471

UNCOV
472
        timestamp = time()
×
UNCOV
473
        if not self._tracker_type == "dummy":
×
UNCOV
474
            ndicapy.ndiCommand(self._device, self._capture_string)
×
UNCOV
475
            for descriptor in self._tool_descriptors:
×
UNCOV
476
                port_handles.append(descriptor.get("port handle"))
×
UNCOV
477
                time_stamps.append(timestamp)
×
UNCOV
478
                frame_numbers.append(self._get_frame(
×
479
                    self._device,
480
                    descriptor.get("c_str port handle")))
UNCOV
481
                qtransform = self._get_transform(
×
482
                    self._device,
483
                    descriptor.get("c_str port handle"))
UNCOV
484
                if not qtransform == "MISSING" and not qtransform == "DISABLED":
×
UNCOV
485
                    tracking_quality.append(qtransform[7])
×
UNCOV
486
                    transform = reshape(qtransform[0:7], [1, 7])
×
487
                else:
UNCOV
488
                    tracking_quality.append(nan)
×
UNCOV
489
                    transform = full((1, 7), nan)
×
490

UNCOV
491
                tracking_rots.append(transform[0][0:4])
×
UNCOV
492
                tracking_trans.append(transform[0][4:7])
×
493
        else:
UNCOV
494
            for descriptor in self._tool_descriptors:
×
UNCOV
495
                port_handles.append(descriptor.get(
×
496
                    "port handle"))
UNCOV
497
                time_stamps.append(timestamp)
×
UNCOV
498
                frame_numbers.append(0)
×
UNCOV
499
                tracking_quality.append(0.0)
×
UNCOV
500
                tracking_rots.append(full((1, 4), nan))
×
UNCOV
501
                tracking_trans.append(full((1, 3), nan))
×
502

UNCOV
503
        self.add_frame_to_buffer(port_handles, time_stamps, frame_numbers,
×
504
            tracking_rots, tracking_trans, tracking_quality,
505
            rot_is_quaternion = True)
506

UNCOV
507
        return self.get_smooth_frame(port_handles)
×
508

509
    def get_tool_descriptions(self):
5✔
510
        """ Returns the port handles and tool descriptions """
UNCOV
511
        port_handles = []
×
UNCOV
512
        descriptions = []
×
UNCOV
513
        for descriptor in self._tool_descriptors:
×
UNCOV
514
            port_handles.append(descriptor.get("port handle"))
×
UNCOV
515
            descriptions.append(descriptor.get("description"))
×
516

UNCOV
517
        return port_handles, descriptions
×
518

519
    def start_tracking(self):
5✔
520
        """
521
        Tells the NDI devices to start tracking.
522

523
        :raises Exception: ValueError
524
        """
UNCOV
525
        if self._state != 'ready':
×
UNCOV
526
            raise ValueError("""Called start tracking before device ready,
×
527
            try calling connect first""")
528

UNCOV
529
        ndicapy.ndiCommand(self._device, 'TSTART:')
×
UNCOV
530
        self._check_for_errors('starting tracking.')
×
UNCOV
531
        self._state = 'tracking'
×
532

533
    def stop_tracking(self):
5✔
534
        """
535
        Tells the NDI devices to stop tracking.
536

537
        :raises Exception: ValueError
538
        """
UNCOV
539
        ndicapy.ndiCommand(self._device, 'TSTOP:')
×
UNCOV
540
        self._check_for_errors('stopping tracking.')
×
UNCOV
541
        self._state = 'ready'
×
542

543
    def _check_for_errors(self, message):
5✔
UNCOV
544
        errnum = ndicapy.ndiGetError(self._device)
×
UNCOV
545
        if errnum != ndicapy.NDI_OKAY:
×
UNCOV
546
            ndicapy.ndiClose(self._device)
×
UNCOV
547
            raise IOError(f'error when {message}. the error was: '
×
548
                          f'{ndicapy.ndiErrorString(errnum)}')
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc