• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.38
/can/interfaces/ixxat/canlib_vcinpl2.py
1
"""
7✔
2
Ctypes wrapper module for IXXAT Virtual CAN Interface V3 on win32 systems
3

4
TODO: We could implement this interface such that setting other filters
5
      could work when the initial filters were set to zero using the
6
      software fallback. Or could the software filters even be changed
7
      after the connection was opened? We need to document that bahaviour!
8
      See also the NICAN interface.
9

10
"""
11

12
import ctypes
7✔
13
import functools
7✔
14
import logging
7✔
15
import sys
7✔
16
import time
7✔
17
import warnings
7✔
18
from collections.abc import Sequence
7✔
19
from typing import Callable, Optional, Union
7✔
20

21
from can import (
7✔
22
    BusABC,
23
    CanProtocol,
24
    CyclicSendTaskABC,
25
    LimitedDurationCyclicSendTaskABC,
26
    Message,
27
    RestartableCyclicTaskABC,
28
)
29
from can.ctypesutil import HANDLE, PHANDLE, CLibrary
7✔
30
from can.ctypesutil import HRESULT as ctypes_HRESULT
7✔
31
from can.exceptions import CanInitializationError, CanInterfaceNotImplementedError
7✔
32
from can.util import deprecated_args_alias, dlc2len, len2dlc
7✔
33

34
from . import constants, structures
7✔
35
from .exceptions import *
7✔
36

37
__all__ = [
7✔
38
    "IXXATBus",
39
    "VCIBusOffError",
40
    "VCIDeviceNotFoundError",
41
    "VCIError",
42
    "VCITimeout",
43
    "vciFormatError",
44
]
45

46
log = logging.getLogger("can.ixxat")
7✔
47

48
# Hack to have vciFormatError as a free function, see below
49
vciFormatError = None
7✔
50

51
# main ctypes instance
52
_canlib = None
7✔
53
# TODO: Use ECI driver for linux
54
if sys.platform == "win32" or sys.platform == "cygwin":
7✔
55
    try:
7✔
56
        _canlib = CLibrary("vcinpl2.dll")
7✔
57
    except Exception as e:
7✔
58
        log.warning("Cannot load IXXAT vcinpl library: %s", e)
7✔
59
else:
60
    # Will not work on other systems, but have it importable anyway for
61
    # tests/sphinx
UNCOV
62
    log.warning("IXXAT VCI library does not work on %s platform", sys.platform)
×
63

64

65
def __vciFormatErrorExtended(
7✔
66
    library_instance: CLibrary, function: Callable, vret: int, args: tuple
67
):
68
    """Format a VCI error and attach failed function, decoded HRESULT and arguments
69
    :param CLibrary library_instance:
70
        Mapped instance of IXXAT vcinpl library
71
    :param callable function:
72
        Failed function
73
    :param HRESULT vret:
74
        HRESULT returned by vcinpl call
75
    :param args:
76
        Arbitrary arguments tuple
77
    :return:
78
        Formatted string
79
    """
80
    # TODO: make sure we don't generate another exception
UNCOV
81
    return (
×
82
        f"{__vciFormatError(library_instance, function, vret)} - arguments were {args}"
83
    )
84

85

86
def __vciFormatError(library_instance: CLibrary, function: Callable, vret: int):
7✔
87
    """Format a VCI error and attach failed function and decoded HRESULT
88
    :param CLibrary library_instance:
89
        Mapped instance of IXXAT vcinpl library
90
    :param callable function:
91
        Failed function
92
    :param HRESULT vret:
93
        HRESULT returned by vcinpl call
94
    :return:
95
        Formatted string
96
    """
97
    buf = ctypes.create_string_buffer(constants.VCI_MAX_ERRSTRLEN)
×
98
    ctypes.memset(buf, 0, constants.VCI_MAX_ERRSTRLEN)
×
99
    library_instance.vciFormatError(vret, buf, constants.VCI_MAX_ERRSTRLEN)
×
UNCOV
100
    return "function {} failed ({})".format(
×
101
        function._name, buf.value.decode("utf-8", "replace")
102
    )
103

104

105
def __check_status(result, function, args):
7✔
106
    """
107
    Check the result of a vcinpl function call and raise appropriate exception
108
    in case of an error. Used as errcheck function when mapping C functions
109
    with ctypes.
110
        :param result:
111
            Function call numeric result
112
        :param callable function:
113
            Called function
114
        :param args:
115
            Arbitrary arguments tuple
116
        :raise:
117
            :class:VCITimeout
118
            :class:VCIRxQueueEmptyError
119
            :class:StopIteration
120
            :class:VCIError
121
    """
122
    if result == constants.VCI_E_TIMEOUT:
×
123
        raise VCITimeout(f"Function {function._name} timed out")
×
124
    elif result == constants.VCI_E_RXQUEUE_EMPTY:
×
125
        raise VCIRxQueueEmptyError()
×
126
    elif result == constants.VCI_E_NO_MORE_ITEMS:
×
127
        raise StopIteration()
×
128
    elif result == constants.VCI_E_ACCESSDENIED:
×
129
        pass  # not a real error, might happen if another program has initialized the bus
×
130
    elif result != constants.VCI_OK:
×
UNCOV
131
        raise VCIError(vciFormatError(function, result))
×
132

UNCOV
133
    return result
×
134

135

136
try:
7✔
137
    hresult_type = ctypes.c_ulong
7✔
138
    # Map all required symbols and initialize library ---------------------------
139
    # HRESULT VCIAPI vciInitialize ( void );
140
    _canlib.map_symbol("vciInitialize", hresult_type, (), __check_status)
7✔
141

142
    # void VCIAPI vciFormatError (HRESULT hrError, PCHAR pszText, UINT32 dwsize);
143
    try:
×
UNCOV
144
        _canlib.map_symbol(
×
145
            "vciFormatError", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32)
146
        )
147
    except ImportError:
×
UNCOV
148
        _canlib.map_symbol(
×
149
            "vciFormatErrorA", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32)
150
        )
UNCOV
151
        _canlib.vciFormatError = _canlib.vciFormatErrorA
×
152
    # Hack to have vciFormatError as a free function
UNCOV
153
    vciFormatError = functools.partial(__vciFormatError, _canlib)
×
154

155
    # HRESULT VCIAPI vciEnumDeviceOpen( OUT PHANDLE hEnum );
UNCOV
156
    _canlib.map_symbol("vciEnumDeviceOpen", hresult_type, (PHANDLE,), __check_status)
×
157
    # HRESULT VCIAPI vciEnumDeviceClose ( IN HANDLE hEnum );
UNCOV
158
    _canlib.map_symbol("vciEnumDeviceClose", hresult_type, (HANDLE,), __check_status)
×
159
    # HRESULT VCIAPI vciEnumDeviceNext( IN  HANDLE hEnum, OUT PVCIDEVICEINFO pInfo );
UNCOV
160
    _canlib.map_symbol(
×
161
        "vciEnumDeviceNext",
162
        hresult_type,
163
        (HANDLE, structures.PVCIDEVICEINFO),
164
        __check_status,
165
    )
166

167
    # HRESULT VCIAPI vciDeviceOpen( IN  REFVCIID rVciid, OUT PHANDLE  phDevice );
UNCOV
168
    _canlib.map_symbol(
×
169
        "vciDeviceOpen", hresult_type, (structures.PVCIID, PHANDLE), __check_status
170
    )
171
    # HRESULT vciDeviceClose( HANDLE hDevice )
UNCOV
172
    _canlib.map_symbol("vciDeviceClose", hresult_type, (HANDLE,), __check_status)
×
173

174
    # HRESULT VCIAPI canChannelOpen( IN  HANDLE  hDevice, IN  UINT32  dwCanNo, IN  BOOL    fExclusive, OUT PHANDLE phCanChn );
UNCOV
175
    _canlib.map_symbol(
×
176
        "canChannelOpen",
177
        hresult_type,
178
        (HANDLE, ctypes.c_uint32, ctypes.c_long, PHANDLE),
179
        __check_status,
180
    )
181
    # EXTERN_C HRESULT VCIAPI
182
    #   canChannelInitialize( IN HANDLE hCanChn,
183
    #                         IN UINT16 wRxFifoSize,
184
    #                         IN UINT16 wRxThreshold,
185
    #                         IN UINT16 wTxFifoSize,
186
    #                         IN UINT16 wTxThreshold,
187
    #                         IN UINT32 dwFilterSize,
188
    #                         IN UINT8  bFilterMode );
UNCOV
189
    _canlib.map_symbol(
×
190
        "canChannelInitialize",
191
        hresult_type,
192
        (
193
            HANDLE,
194
            ctypes.c_uint16,
195
            ctypes.c_uint16,
196
            ctypes.c_uint16,
197
            ctypes.c_uint16,
198
            ctypes.c_uint32,
199
            ctypes.c_uint8,
200
        ),
201
        __check_status,
202
    )
203
    # EXTERN_C HRESULT VCIAPI canChannelActivate( IN HANDLE hCanChn, IN BOOL   fEnable );
UNCOV
204
    _canlib.map_symbol(
×
205
        "canChannelActivate", hresult_type, (HANDLE, ctypes.c_long), __check_status
206
    )
207
    # HRESULT canChannelClose( HANDLE hChannel )
UNCOV
208
    _canlib.map_symbol("canChannelClose", hresult_type, (HANDLE,), __check_status)
×
209
    # EXTERN_C HRESULT VCIAPI canChannelReadMessage( IN  HANDLE  hCanChn, IN  UINT32  dwMsTimeout, OUT PCANMSG2 pCanMsg );
UNCOV
210
    _canlib.map_symbol(
×
211
        "canChannelReadMessage",
212
        hresult_type,
213
        (HANDLE, ctypes.c_uint32, structures.PCANMSG2),
214
        __check_status,
215
    )
216
    # HRESULT canChannelPeekMessage(HANDLE hChannel,PCANMSG2 pCanMsg );
UNCOV
217
    _canlib.map_symbol(
×
218
        "canChannelPeekMessage",
219
        hresult_type,
220
        (HANDLE, structures.PCANMSG2),
221
        __check_status,
222
    )
223
    # HRESULT canChannelWaitTxEvent (HANDLE hChannel UINT32 dwMsTimeout );
UNCOV
224
    _canlib.map_symbol(
×
225
        "canChannelWaitTxEvent",
226
        hresult_type,
227
        (HANDLE, ctypes.c_uint32),
228
        __check_status,
229
    )
230
    # HRESULT canChannelWaitRxEvent (HANDLE hChannel, UINT32 dwMsTimeout );
UNCOV
231
    _canlib.map_symbol(
×
232
        "canChannelWaitRxEvent",
233
        hresult_type,
234
        (HANDLE, ctypes.c_uint32),
235
        __check_status,
236
    )
237
    # HRESULT canChannelPostMessage (HANDLE hChannel, PCANMSG2 pCanMsg );
UNCOV
238
    _canlib.map_symbol(
×
239
        "canChannelPostMessage",
240
        hresult_type,
241
        (HANDLE, structures.PCANMSG2),
242
        __check_status,
243
    )
244
    # HRESULT canChannelSendMessage (HANDLE hChannel, UINT32 dwMsTimeout, PCANMSG2 pCanMsg );
UNCOV
245
    _canlib.map_symbol(
×
246
        "canChannelSendMessage",
247
        hresult_type,
248
        (HANDLE, ctypes.c_uint32, structures.PCANMSG2),
249
        __check_status,
250
    )
251

252
    # EXTERN_C HRESULT VCIAPI canControlOpen( IN  HANDLE  hDevice, IN  UINT32  dwCanNo, OUT PHANDLE phCanCtl );
UNCOV
253
    _canlib.map_symbol(
×
254
        "canControlOpen",
255
        hresult_type,
256
        (HANDLE, ctypes.c_uint32, PHANDLE),
257
        __check_status,
258
    )
259
    # EXTERN_C HRESULT VCIAPI
260
    #   canControlInitialize( IN HANDLE  hCanCtl,
261
    #                         IN UINT8   bOpMode,
262
    #                         IN UINT8   bExMode,
263
    #                         IN UINT8   bSFMode,
264
    #                         IN UINT8   bEFMode,
265
    #                         IN UINT32  dwSFIds,
266
    #                         IN UINT32  dwEFIds,
267
    #                         IN PCANBTP pBtpSDR,
268
    #                         IN PCANBTP pBtpFDR );
UNCOV
269
    _canlib.map_symbol(
×
270
        "canControlInitialize",
271
        hresult_type,
272
        (
273
            HANDLE,
274
            ctypes.c_uint8,
275
            ctypes.c_uint8,
276
            ctypes.c_uint8,
277
            ctypes.c_uint8,
278
            ctypes.c_uint32,
279
            ctypes.c_uint32,
280
            structures.PCANBTP,
281
            structures.PCANBTP,
282
        ),
283
        __check_status,
284
    )
285
    # EXTERN_C HRESULT VCIAPI canControlClose( IN HANDLE hCanCtl );
UNCOV
286
    _canlib.map_symbol("canControlClose", hresult_type, (HANDLE,), __check_status)
×
287
    # EXTERN_C HRESULT VCIAPI canControlReset( IN HANDLE hCanCtl );
UNCOV
288
    _canlib.map_symbol("canControlReset", hresult_type, (HANDLE,), __check_status)
×
289
    # EXTERN_C HRESULT VCIAPI canControlStart( IN HANDLE hCanCtl, IN BOOL   fStart );
UNCOV
290
    _canlib.map_symbol(
×
291
        "canControlStart", hresult_type, (HANDLE, ctypes.c_long), __check_status
292
    )
293
    # EXTERN_C HRESULT VCIAPI canControlGetStatus( IN  HANDLE hCanCtl, OUT PCANLINESTATUS2 pStatus );
UNCOV
294
    _canlib.map_symbol(
×
295
        "canControlGetStatus",
296
        hresult_type,
297
        (HANDLE, structures.PCANLINESTATUS2),
298
        __check_status,
299
    )
300
    # EXTERN_C HRESULT VCIAPI canControlGetCaps( IN  HANDLE hCanCtl, OUT PCANCAPABILITIES2 pCanCaps );
UNCOV
301
    _canlib.map_symbol(
×
302
        "canControlGetCaps",
303
        hresult_type,
304
        (HANDLE, structures.PCANCAPABILITIES2),
305
        __check_status,
306
    )
307
    # EXTERN_C HRESULT VCIAPI canControlSetAccFilter( IN HANDLE hCanCtl, IN BOOL   fExtend, IN UINT32 dwCode, IN UINT32 dwMask );
UNCOV
308
    _canlib.map_symbol(
×
309
        "canControlSetAccFilter",
310
        hresult_type,
311
        (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32),
312
        __check_status,
313
    )
314
    # EXTERN_C HRESULT canControlAddFilterIds (HANDLE hControl, BOOL fExtended, UINT32 dwCode, UINT32 dwMask);
UNCOV
315
    _canlib.map_symbol(
×
316
        "canControlAddFilterIds",
317
        hresult_type,
318
        (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32),
319
        __check_status,
320
    )
321
    # EXTERN_C HRESULT canControlRemFilterIds (HANDLE hControl, BOOL fExtendend, UINT32 dwCode, UINT32 dwMask );
UNCOV
322
    _canlib.map_symbol(
×
323
        "canControlRemFilterIds",
324
        hresult_type,
325
        (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32),
326
        __check_status,
327
    )
328
    # EXTERN_C HRESULT canSchedulerOpen (HANDLE hDevice, UINT32 dwCanNo, PHANDLE phScheduler );
UNCOV
329
    _canlib.map_symbol(
×
330
        "canSchedulerOpen",
331
        hresult_type,
332
        (HANDLE, ctypes.c_uint32, PHANDLE),
333
        __check_status,
334
    )
335
    # EXTERN_C HRESULT canSchedulerClose (HANDLE hScheduler );
UNCOV
336
    _canlib.map_symbol("canSchedulerClose", hresult_type, (HANDLE,), __check_status)
×
337
    # EXTERN_C HRESULT canSchedulerGetCaps (HANDLE hScheduler, PCANCAPABILITIES2 pCaps );
UNCOV
338
    _canlib.map_symbol(
×
339
        "canSchedulerGetCaps",
340
        hresult_type,
341
        (HANDLE, structures.PCANCAPABILITIES2),
342
        __check_status,
343
    )
344
    # EXTERN_C HRESULT canSchedulerActivate ( HANDLE hScheduler, BOOL fEnable );
UNCOV
345
    _canlib.map_symbol(
×
346
        "canSchedulerActivate", hresult_type, (HANDLE, ctypes.c_int), __check_status
347
    )
348
    # EXTERN_C HRESULT canSchedulerAddMessage (HANDLE hScheduler, PCANCYCLICTXMSG2 pMessage, PUINT32 pdwIndex );
UNCOV
349
    _canlib.map_symbol(
×
350
        "canSchedulerAddMessage",
351
        hresult_type,
352
        (HANDLE, structures.PCANCYCLICTXMSG2, ctypes.POINTER(ctypes.c_uint32)),
353
        __check_status,
354
    )
355
    # EXTERN_C HRESULT canSchedulerRemMessage (HANDLE hScheduler, UINT32 dwIndex );
UNCOV
356
    _canlib.map_symbol(
×
357
        "canSchedulerRemMessage",
358
        hresult_type,
359
        (HANDLE, ctypes.c_uint32),
360
        __check_status,
361
    )
362
    # EXTERN_C HRESULT canSchedulerStartMessage (HANDLE hScheduler, UINT32 dwIndex, UINT16 dwCount );
UNCOV
363
    _canlib.map_symbol(
×
364
        "canSchedulerStartMessage",
365
        hresult_type,
366
        (HANDLE, ctypes.c_uint32, ctypes.c_uint16),
367
        __check_status,
368
    )
369
    # EXTERN_C HRESULT canSchedulerStopMessage (HANDLE hScheduler, UINT32 dwIndex );
UNCOV
370
    _canlib.map_symbol(
×
371
        "canSchedulerStopMessage",
372
        hresult_type,
373
        (HANDLE, ctypes.c_uint32),
374
        __check_status,
375
    )
UNCOV
376
    _canlib.vciInitialize()
×
377
except AttributeError:
7✔
378
    # In case _canlib == None meaning we're not on win32/no lib found
379
    pass
7✔
380
except Exception as e:
×
UNCOV
381
    log.warning("Could not initialize IXXAT VCI library: %s", e)
×
382
# ---------------------------------------------------------------------------
383

384

385
CAN_INFO_MESSAGES = {
7✔
386
    constants.CAN_INFO_START: "CAN started",
387
    constants.CAN_INFO_STOP: "CAN stopped",
388
    constants.CAN_INFO_RESET: "CAN reset",
389
}
390

391
CAN_ERROR_MESSAGES = {
7✔
392
    constants.CAN_ERROR_STUFF: "CAN bit stuff error",
393
    constants.CAN_ERROR_FORM: "CAN form error",
394
    constants.CAN_ERROR_ACK: "CAN acknowledgment error",
395
    constants.CAN_ERROR_BIT: "CAN bit error",
396
    constants.CAN_ERROR_CRC: "CAN CRC error",
397
    constants.CAN_ERROR_OTHER: "Other (unknown) CAN error",
398
}
399

400
CAN_STATUS_FLAGS = {
7✔
401
    constants.CAN_STATUS_TXPEND: "transmission pending",
402
    constants.CAN_STATUS_OVRRUN: "data overrun occurred",
403
    constants.CAN_STATUS_ERRLIM: "error warning limit exceeded",
404
    constants.CAN_STATUS_BUSOFF: "bus off",
405
    constants.CAN_STATUS_ININIT: "init mode active",
406
    constants.CAN_STATUS_BUSCERR: "bus coupling error",
407
}
408
# ----------------------------------------------------------------------------
409

410

411
class IXXATBus(BusABC):
7✔
412
    """The CAN Bus implemented for the IXXAT interface.
7✔
413

414
    .. warning::
415

416
        This interface does implement efficient filtering of messages, but
417
        the filters have to be set in ``__init__`` using the ``can_filters`` parameter.
418
        Using :meth:`~can.BusABC.set_filters` does not work.
419

420
    """
421

422
    @deprecated_args_alias(
7✔
423
        deprecation_start="4.0.0",
424
        deprecation_end="5.0.0",
425
        UniqueHardwareId="unique_hardware_id",
426
        rxFifoSize="rx_fifo_size",
427
        txFifoSize="tx_fifo_size",
428
    )
429
    def __init__(
7✔
430
        self,
431
        channel: int,
432
        can_filters=None,
433
        receive_own_messages: int = False,
434
        unique_hardware_id: Optional[int] = None,
435
        extended: bool = True,
436
        rx_fifo_size: int = 1024,
437
        tx_fifo_size: int = 128,
438
        bitrate: int = 500000,
439
        data_bitrate: int = 2000000,
440
        sjw_abr: Optional[int] = None,
441
        tseg1_abr: Optional[int] = None,
442
        tseg2_abr: Optional[int] = None,
443
        sjw_dbr: Optional[int] = None,
444
        tseg1_dbr: Optional[int] = None,
445
        tseg2_dbr: Optional[int] = None,
446
        ssp_dbr: Optional[int] = None,
447
        **kwargs,
448
    ):
449
        """
450
        :param channel:
451
            The Channel id to create this bus with.
452

453
        :param can_filters:
454
            See :meth:`can.BusABC.set_filters`.
455

456
        :param receive_own_messages:
457
            Enable self-reception of sent messages.
458

459
        :param unique_hardware_id:
460
            unique_hardware_id to connect (optional, will use the first found if not supplied)
461

462
        :param extended:
463
            Default True, enables the capability to use extended IDs.
464

465
        :param rx_fifo_size:
466
            Receive fifo size (default 1024)
467

468
        :param tx_fifo_size:
469
            Transmit fifo size (default 128)
470

471
        :param bitrate:
472
            Channel bitrate in bit/s
473

474
        :param data_bitrate:
475
            Channel bitrate in bit/s (only in CAN-Fd if baudrate switch enabled).
476

477
        :param sjw_abr:
478
            Bus timing value sample jump width (arbitration).
479

480
        :param tseg1_abr:
481
            Bus timing value tseg1 (arbitration)
482

483
        :param tseg2_abr:
484
            Bus timing value tseg2 (arbitration)
485

486
        :param sjw_dbr:
487
            Bus timing value sample jump width (data)
488

489
        :param tseg1_dbr:
490
            Bus timing value tseg1 (data). Only takes effect with fd and bitrate switch enabled.
491

492
        :param tseg2_dbr:
493
            Bus timing value tseg2 (data). Only takes effect with fd and bitrate switch enabled.
494

495
        :param ssp_dbr:
496
            Secondary sample point (data). Only takes effect with fd and bitrate switch enabled.
497

498
        """
499
        if _canlib is None:
7✔
500
            raise CanInterfaceNotImplementedError(
7✔
501
                "The IXXAT VCI library has not been initialized. Check the logs for more details."
502
            )
UNCOV
503
        log.info("CAN Filters: %s", can_filters)
×
504
        # Configuration options
UNCOV
505
        self._receive_own_messages = receive_own_messages
×
506
        # Usually comes as a string from the config file
UNCOV
507
        channel = int(channel)
×
508

UNCOV
509
        if bitrate not in constants.CAN_BITRATE_PRESETS and (
×
510
            tseg1_abr is None or tseg2_abr is None or sjw_abr is None
511
        ):
UNCOV
512
            raise ValueError(
×
513
                f"To use bitrate {bitrate} (that has not predefined preset) is mandatory "
514
                f"to use also parameters tseg1_abr, tseg2_abr and swj_abr"
515
            )
UNCOV
516
        if data_bitrate not in constants.CAN_DATABITRATE_PRESETS and (
×
517
            tseg1_dbr is None or tseg2_dbr is None or sjw_dbr is None
518
        ):
UNCOV
519
            raise ValueError(
×
520
                f"To use data_bitrate {data_bitrate} (that has not predefined preset) is mandatory "
521
                f"to use also parameters tseg1_dbr, tseg2_dbr and swj_dbr"
522
            )
523

524
        if rx_fifo_size <= 0:
×
UNCOV
525
            raise ValueError("rx_fifo_size must be > 0")
×
526

527
        if tx_fifo_size <= 0:
×
UNCOV
528
            raise ValueError("tx_fifo_size must be > 0")
×
529

530
        if channel < 0:
×
UNCOV
531
            raise ValueError("channel number must be >= 0")
×
532

533
        self._device_handle = HANDLE()
×
534
        self._device_info = structures.VCIDEVICEINFO()
×
535
        self._control_handle = HANDLE()
×
536
        self._channel_handle = HANDLE()
×
537
        self._channel_capabilities = structures.CANCAPABILITIES2()
×
538
        self._message = structures.CANMSG2()
×
539
        self._payload = (ctypes.c_byte * 64)()
×
UNCOV
540
        self._can_protocol = CanProtocol.CAN_FD
×
541

542
        # Search for supplied device
543
        if unique_hardware_id is None:
×
UNCOV
544
            log.info("Searching for first available device")
×
545
        else:
546
            log.info("Searching for unique HW ID %s", unique_hardware_id)
×
UNCOV
547
        _canlib.vciEnumDeviceOpen(ctypes.byref(self._device_handle))
×
548
        while True:
549
            try:
×
UNCOV
550
                _canlib.vciEnumDeviceNext(
×
551
                    self._device_handle, ctypes.byref(self._device_info)
552
                )
553
            except StopIteration:
×
554
                if unique_hardware_id is None:
×
UNCOV
555
                    raise VCIDeviceNotFoundError(
×
556
                        "No IXXAT device(s) connected or device(s) in use by other process(es)."
557
                    ) from None
558
                else:
UNCOV
559
                    raise VCIDeviceNotFoundError(
×
560
                        f"Unique HW ID {unique_hardware_id} not connected or not available."
561
                    ) from None
562
            else:
UNCOV
563
                if (unique_hardware_id is None) or (
×
564
                    self._device_info.UniqueHardwareId.AsChar
565
                    == bytes(unique_hardware_id, "ascii")
566
                ):
UNCOV
567
                    break
×
568
                else:
UNCOV
569
                    log.debug(
×
570
                        "Ignoring IXXAT with hardware id '%s'.",
571
                        self._device_info.UniqueHardwareId.AsChar.decode("ascii"),
572
                    )
UNCOV
573
        _canlib.vciEnumDeviceClose(self._device_handle)
×
574

575
        try:
×
UNCOV
576
            _canlib.vciDeviceOpen(
×
577
                ctypes.byref(self._device_info.VciObjectId),
578
                ctypes.byref(self._device_handle),
579
            )
580
        except Exception as exception:
×
UNCOV
581
            raise CanInitializationError(
×
582
                f"Could not open device: {exception}"
583
            ) from exception
584

UNCOV
585
        log.info("Using unique HW ID %s", self._device_info.UniqueHardwareId.AsChar)
×
586

UNCOV
587
        log.info(
×
588
            "Initializing channel %d in shared mode, %d rx buffers, %d tx buffers",
589
            channel,
590
            rx_fifo_size,
591
            tx_fifo_size,
592
        )
593

594
        try:
×
UNCOV
595
            _canlib.canChannelOpen(
×
596
                self._device_handle,
597
                channel,
598
                constants.FALSE,
599
                ctypes.byref(self._channel_handle),
600
            )
601
        except Exception as exception:
×
UNCOV
602
            raise CanInitializationError(
×
603
                f"Could not open and initialize channel: {exception}"
604
            ) from exception
605

606
        # Signal TX/RX events when at least one frame has been handled
UNCOV
607
        _canlib.canChannelInitialize(
×
608
            self._channel_handle,
609
            rx_fifo_size,
610
            1,
611
            tx_fifo_size,
612
            1,
613
            0,
614
            constants.CAN_FILTER_PASS,
615
        )
UNCOV
616
        _canlib.canChannelActivate(self._channel_handle, constants.TRUE)
×
617

UNCOV
618
        pBtpSDR = IXXATBus._canptb_build(
×
619
            defaults=constants.CAN_BITRATE_PRESETS,
620
            bitrate=bitrate,
621
            tseg1=tseg1_abr,
622
            tseg2=tseg2_abr,
623
            sjw=sjw_abr,
624
            ssp=0,
625
        )
UNCOV
626
        pBtpFDR = IXXATBus._canptb_build(
×
627
            defaults=constants.CAN_DATABITRATE_PRESETS,
628
            bitrate=data_bitrate,
629
            tseg1=tseg1_dbr,
630
            tseg2=tseg2_dbr,
631
            sjw=sjw_dbr,
632
            ssp=ssp_dbr if ssp_dbr is not None else tseg1_dbr,
633
        )
634

UNCOV
635
        log.info(
×
636
            "Initializing control %d with SDR={%s}, FDR={%s}",
637
            channel,
638
            pBtpSDR,
639
            pBtpFDR,
640
        )
UNCOV
641
        _canlib.canControlOpen(
×
642
            self._device_handle, channel, ctypes.byref(self._control_handle)
643
        )
644

UNCOV
645
        _canlib.canControlGetCaps(
×
646
            self._control_handle, ctypes.byref(self._channel_capabilities)
647
        )
648

649
        # check capabilities
650
        bOpMode = constants.CAN_OPMODE_UNDEFINED
×
UNCOV
651
        if (
×
652
            self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_STDANDEXT
653
        ) != 0:
654
            # controller supportes CAN_OPMODE_STANDARD and CAN_OPMODE_EXTENDED at the same time
655
            bOpMode |= constants.CAN_OPMODE_STANDARD  # enable both 11 bits reception
×
656
            if extended:  # parameter from configuration
×
657
                bOpMode |= constants.CAN_OPMODE_EXTENDED  # enable 29 bits reception
×
UNCOV
658
        elif (
×
659
            self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_STDANDEXT
660
        ) != 0:
UNCOV
661
            log.warning(
×
662
                "Channel %d capabilities allow either basic or extended IDs, but not both. using %s according to parameter [extended=%s]",
663
                channel,
664
                "extended" if extended else "basic",
665
                "True" if extended else "False",
666
            )
UNCOV
667
            bOpMode |= (
×
668
                constants.CAN_OPMODE_EXTENDED
669
                if extended
670
                else constants.CAN_OPMODE_STANDARD
671
            )
672

UNCOV
673
        if (
×
674
            self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_ERRFRAME
675
        ) != 0:
UNCOV
676
            bOpMode |= constants.CAN_OPMODE_ERRFRAME
×
677

678
        bExMode = constants.CAN_EXMODE_DISABLED
×
679
        if (self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_EXTDATA) != 0:
×
UNCOV
680
            bExMode |= constants.CAN_EXMODE_EXTDATALEN
×
681

UNCOV
682
        if (
×
683
            self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_FASTDATA
684
        ) != 0:
UNCOV
685
            bExMode |= constants.CAN_EXMODE_FASTDATA
×
686

UNCOV
687
        _canlib.canControlInitialize(
×
688
            self._control_handle,
689
            bOpMode,
690
            bExMode,
691
            constants.CAN_FILTER_PASS,
692
            constants.CAN_FILTER_PASS,
693
            0,
694
            0,
695
            ctypes.byref(pBtpSDR),
696
            ctypes.byref(pBtpFDR),
697
        )
698

699
        # With receive messages, this field contains the relative reception time of
700
        # the message in ticks. The resolution of a tick can be calculated from the fields
701
        # dwClockFreq and dwTscDivisor of the structure  CANCAPABILITIES in accordance with the following formula:
702
        # frequency [1/s] = dwClockFreq / dwTscDivisor
UNCOV
703
        self._tick_resolution = (
×
704
            self._channel_capabilities.dwTscClkFreq
705
            / self._channel_capabilities.dwTscDivisor
706
        )
707

708
        # Setup filters before starting the channel
709
        if can_filters:
×
UNCOV
710
            log.info("The IXXAT VCI backend is filtering messages")
×
711
            # Disable every message coming in
712
            for extended in (0, 1):
×
UNCOV
713
                _canlib.canControlSetAccFilter(
×
714
                    self._control_handle,
715
                    extended,
716
                    constants.CAN_ACC_CODE_NONE,
717
                    constants.CAN_ACC_MASK_NONE,
718
                )
UNCOV
719
            for can_filter in can_filters:
×
720
                # Filters define what messages are accepted
721
                code = int(can_filter["can_id"])
×
722
                mask = int(can_filter["can_mask"])
×
723
                extended = can_filter.get("extended", False)
×
UNCOV
724
                _canlib.canControlAddFilterIds(
×
725
                    self._control_handle, 1 if extended else 0, code << 1, mask << 1
726
                )
UNCOV
727
                log.info("Accepting ID: 0x%X MASK: 0x%X", code, mask)
×
728

729
        # Start the CAN controller. Messages will be forwarded to the channel
UNCOV
730
        start_begin = time.time()
×
UNCOV
731
        _canlib.canControlStart(self._control_handle, constants.TRUE)
×
732
        start_end = time.time()
×
733

734
        # Calculate an offset to make them relative to epoch
735
        # Assume that the time offset is in the middle of the start command
UNCOV
736
        self._timeoffset = start_begin + (start_end - start_begin / 2)
×
UNCOV
737
        self._overrunticks = 0
×
738
        self._starttickoffset = 0
×
739

740
        # For cyclic transmit list. Set when .send_periodic() is first called
UNCOV
741
        self._scheduler = None
×
UNCOV
742
        self._scheduler_resolution = None
×
743
        self.channel = channel
×
744

745
        # Usually you get back 3 messages like "CAN initialized" ecc...
746
        # Clear the FIFO by filter them out with low timeout
UNCOV
747
        for _ in range(rx_fifo_size):
×
UNCOV
748
            try:
×
UNCOV
749
                _canlib.canChannelReadMessage(
×
750
                    self._channel_handle, 0, ctypes.byref(self._message)
751
                )
752
            except (VCITimeout, VCIRxQueueEmptyError):
×
753
                break
×
754

755
        super().__init__(channel=channel, can_filters=None, **kwargs)
×
756

757
    @staticmethod
7✔
758
    def _canptb_build(defaults, bitrate, tseg1, tseg2, sjw, ssp):
7✔
759
        if bitrate in defaults:
×
760
            d = defaults[bitrate]
×
UNCOV
761
            if tseg1 is None:
×
762
                tseg1 = d.wTS1
×
UNCOV
763
            if tseg2 is None:
×
764
                tseg2 = d.wTS2
×
UNCOV
765
            if sjw is None:
×
UNCOV
766
                sjw = d.wSJW
×
UNCOV
767
            if ssp is None:
×
UNCOV
768
                ssp = d.wTDO
×
UNCOV
769
            dw_mode = d.dwMode
×
770
        else:
UNCOV
771
            dw_mode = 0
×
772

UNCOV
773
        return structures.CANBTP(
×
774
            dwMode=dw_mode,
775
            dwBPS=bitrate,
776
            wTS1=tseg1,
777
            wTS2=tseg2,
778
            wSJW=sjw,
779
            wTDO=ssp,
780
        )
781

782
    def _inWaiting(self):
7✔
UNCOV
783
        try:
×
784
            _canlib.canChannelWaitRxEvent(self._channel_handle, 0)
×
UNCOV
785
        except VCITimeout:
×
UNCOV
786
            return 0
×
787
        else:
UNCOV
788
            return 1
×
789

790
    def flush_tx_buffer(self):
7✔
791
        """Flushes the transmit buffer on the IXXAT"""
792
        # TODO #64: no timeout?
UNCOV
793
        _canlib.canChannelWaitTxEvent(self._channel_handle, constants.INFINITE)
×
794

795
    def _recv_internal(self, timeout):
7✔
796
        """Read a message from IXXAT device."""
797

798
        # TODO: handling CAN error messages?
UNCOV
799
        data_received = False
×
800

UNCOV
801
        if timeout == 0:
×
802
            # Peek without waiting
803
            try:
×
UNCOV
804
                _canlib.canChannelPeekMessage(
×
805
                    self._channel_handle, ctypes.byref(self._message)
806
                )
807
            except (VCITimeout, VCIRxQueueEmptyError, VCIError):
×
808
                # VCIError means no frame available (canChannelPeekMessage returned different from zero)
UNCOV
809
                return None, True
×
810
            else:
811
                if self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_DATA:
×
812
                    data_received = True
×
813
        else:
814
            # Wait if no message available
815
            if timeout is None or timeout < 0:
×
816
                remaining_ms = constants.INFINITE
×
UNCOV
817
                t0 = None
×
818
            else:
819
                timeout_ms = int(timeout * 1000)
×
UNCOV
820
                remaining_ms = timeout_ms
×
821
                t0 = time.perf_counter()
×
822

823
            while True:
824
                try:
×
825
                    _canlib.canChannelReadMessage(
×
826
                        self._channel_handle, remaining_ms, ctypes.byref(self._message)
827
                    )
828
                except (VCITimeout, VCIRxQueueEmptyError):
×
829
                    # Ignore the 2 errors, the timeout is handled manually with the perf_counter()
UNCOV
830
                    pass
×
831
                else:
832
                    # See if we got a data or info/error messages
UNCOV
833
                    if self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_DATA:
×
UNCOV
834
                        data_received = True
×
835
                        break
×
UNCOV
836
                    elif self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_INFO:
×
UNCOV
837
                        log.info(
×
838
                            CAN_INFO_MESSAGES.get(
839
                                self._message.abData[0],
840
                                f"Unknown CAN info message code {self._message.abData[0]}",
841
                            )
842
                        )
843
                    # Handle CAN start info message
UNCOV
844
                    elif self._message.abData[0] == constants.CAN_INFO_START:
×
845
                        self._starttickoffset = self._message.dwTime
×
UNCOV
846
                    elif (
×
847
                        self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_ERROR
848
                    ):
849
                        log.warning(
×
850
                            CAN_ERROR_MESSAGES.get(
851
                                self._message.abData[0],
852
                                f"Unknown CAN error message code {self._message.abData[0]}",
853
                            )
854
                        )
855

856
                    elif (
×
857
                        self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_STATUS
858
                    ):
UNCOV
859
                        log.info(_format_can_status(self._message.abData[0]))
×
860
                        if self._message.abData[0] & constants.CAN_STATUS_BUSOFF:
×
861
                            raise VCIBusOffError()
×
862

863
                    elif (
×
864
                        self._message.uMsgInfo.Bits.type
865
                        == constants.CAN_MSGTYPE_TIMEOVR
866
                    ):
867
                        # Add the number of timestamp overruns to the high word
UNCOV
868
                        self._overrunticks += self._message.dwMsgId << 32
×
869
                    else:
UNCOV
870
                        log.warning("Unexpected message info type")
×
871

872
                if t0 is not None:
×
UNCOV
873
                    remaining_ms = timeout_ms - int((time.perf_counter() - t0) * 1000)
×
UNCOV
874
                    if remaining_ms < 0:
×
UNCOV
875
                        break
×
876

UNCOV
877
        if not data_received:
×
878
            # Timed out / can message type is not DATA
UNCOV
879
            return None, True
×
880

UNCOV
881
        data_len = dlc2len(self._message.uMsgInfo.Bits.dlc)
×
UNCOV
882
        rx_msg = Message(
×
883
            timestamp=(
884
                (self._message.dwTime + self._overrunticks - self._starttickoffset)
885
                / self._tick_resolution
886
            )
887
            + self._timeoffset,
888
            is_remote_frame=bool(self._message.uMsgInfo.Bits.rtr),
889
            is_fd=bool(self._message.uMsgInfo.Bits.edl),
890
            is_rx=True,
891
            is_error_frame=bool(
892
                self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_ERROR
893
            ),
894
            bitrate_switch=bool(self._message.uMsgInfo.Bits.fdr),
895
            error_state_indicator=bool(self._message.uMsgInfo.Bits.esi),
896
            is_extended_id=bool(self._message.uMsgInfo.Bits.ext),
897
            arbitration_id=self._message.dwMsgId,
898
            dlc=data_len,
899
            data=self._message.abData[:data_len],
900
            channel=self.channel,
901
        )
902

UNCOV
903
        return rx_msg, True
×
904

905
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
7✔
906
        """
907
        Sends a message on the bus. The interface may buffer the message.
908

909
        :param msg:
910
            The message to send.
911
        :param timeout:
912
            Timeout after some time.
913
        :raise:
914
            :class:CanTimeoutError
915
            :class:CanOperationError
916
        """
917
        # This system is not designed to be very efficient
918
        message = structures.CANMSG2()
×
919
        message.uMsgInfo.Bits.type = (
×
920
            constants.CAN_MSGTYPE_ERROR
921
            if msg.is_error_frame
922
            else constants.CAN_MSGTYPE_DATA
923
        )
924
        message.uMsgInfo.Bits.rtr = 1 if msg.is_remote_frame else 0
×
925
        message.uMsgInfo.Bits.ext = 1 if msg.is_extended_id else 0
×
UNCOV
926
        message.uMsgInfo.Bits.srr = 1 if self._receive_own_messages else 0
×
927
        message.uMsgInfo.Bits.fdr = 1 if msg.bitrate_switch else 0
×
928
        message.uMsgInfo.Bits.esi = 1 if msg.error_state_indicator else 0
×
UNCOV
929
        message.uMsgInfo.Bits.edl = 1 if msg.is_fd else 0
×
UNCOV
930
        message.dwMsgId = msg.arbitration_id
×
UNCOV
931
        if msg.dlc:  # this dlc means number of bytes of payload
×
UNCOV
932
            message.uMsgInfo.Bits.dlc = len2dlc(msg.dlc)
×
933
            data_len_dif = msg.dlc - len(msg.data)
×
UNCOV
934
            data = msg.data + bytearray(
×
935
                [0] * data_len_dif
936
            )  # pad with zeros until required length
UNCOV
937
            adapter = (ctypes.c_uint8 * msg.dlc).from_buffer(data)
×
UNCOV
938
            ctypes.memmove(message.abData, adapter, msg.dlc)
×
939

UNCOV
940
        if timeout:
×
UNCOV
941
            _canlib.canChannelSendMessage(
×
942
                self._channel_handle, int(timeout * 1000), message
943
            )
944

945
        else:
946
            _canlib.canChannelPostMessage(self._channel_handle, message)
×
947

948
    def _send_periodic_internal(
7✔
949
        self,
950
        msgs: Union[Sequence[Message], Message],
951
        period: float,
952
        duration: Optional[float] = None,
953
        autostart: bool = True,
954
        modifier_callback: Optional[Callable[[Message], None]] = None,
955
    ) -> CyclicSendTaskABC:
956
        """Send a message using built-in cyclic transmit list functionality."""
UNCOV
957
        if modifier_callback is None:
×
UNCOV
958
            if self._scheduler is None:
×
UNCOV
959
                self._scheduler = HANDLE()
×
UNCOV
960
                _canlib.canSchedulerOpen(
×
961
                    self._device_handle, self.channel, self._scheduler
962
                )
UNCOV
963
                caps = structures.CANCAPABILITIES2()
×
UNCOV
964
                _canlib.canSchedulerGetCaps(self._scheduler, caps)
×
UNCOV
965
                self._scheduler_resolution = (
×
966
                    caps.dwCmsClkFreq / caps.dwCmsDivisor
967
                )  # TODO: confirm
UNCOV
968
                _canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
×
UNCOV
969
            return CyclicSendTask(
×
970
                self._scheduler,
971
                msgs,
972
                period,
973
                duration,
974
                self._scheduler_resolution,
975
                autostart=autostart,
976
            )
977

978
        # fallback to thread based cyclic task
UNCOV
979
        warnings.warn(
×
980
            f"{self.__class__.__name__} falls back to a thread-based cyclic task, "
981
            "when the `modifier_callback` argument is given.",
982
            stacklevel=3,
983
        )
984
        return BusABC._send_periodic_internal(
×
985
            self,
986
            msgs=msgs,
987
            period=period,
988
            duration=duration,
989
            autostart=autostart,
990
            modifier_callback=modifier_callback,
991
        )
992

993
    def shutdown(self):
7✔
UNCOV
994
        super().shutdown()
×
UNCOV
995
        if self._scheduler is not None:
×
UNCOV
996
            _canlib.canSchedulerClose(self._scheduler)
×
UNCOV
997
        _canlib.canChannelClose(self._channel_handle)
×
UNCOV
998
        _canlib.canControlStart(self._control_handle, constants.FALSE)
×
UNCOV
999
        _canlib.canControlClose(self._control_handle)
×
UNCOV
1000
        _canlib.vciDeviceClose(self._device_handle)
×
1001

1002

1003
class CyclicSendTask(LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC):
7✔
1004
    """A message in the cyclic transmit list."""
7✔
1005

1006
    def __init__(
7✔
1007
        self,
1008
        scheduler,
1009
        msgs,
1010
        period,
1011
        duration,
1012
        resolution,
1013
        autostart: bool = True,
1014
    ):
1015
        super().__init__(msgs, period, duration)
×
1016
        if len(self.messages) != 1:
×
1017
            raise ValueError(
×
1018
                "IXXAT Interface only supports periodic transmission of 1 element"
1019
            )
1020

1021
        self._scheduler = scheduler
×
1022
        self._index = None
×
UNCOV
1023
        self._count = int(duration / period) if duration else 0
×
1024

UNCOV
1025
        self._msg = structures.CANCYCLICTXMSG2()
×
1026
        self._msg.wCycleTime = round(period * resolution)
×
1027
        self._msg.dwMsgId = self.messages[0].arbitration_id
×
1028
        self._msg.uMsgInfo.Bits.type = constants.CAN_MSGTYPE_DATA
×
1029
        self._msg.uMsgInfo.Bits.ext = 1 if self.messages[0].is_extended_id else 0
×
UNCOV
1030
        self._msg.uMsgInfo.Bits.rtr = 1 if self.messages[0].is_remote_frame else 0
×
UNCOV
1031
        self._msg.uMsgInfo.Bits.dlc = self.messages[0].dlc
×
UNCOV
1032
        for i, b in enumerate(self.messages[0].data):
×
1033
            self._msg.abData[i] = b
×
UNCOV
1034
        if autostart:
×
UNCOV
1035
            self.start()
×
1036

1037
    def start(self):
7✔
1038
        """Start transmitting message (add to list if needed)."""
1039
        if self._index is None:
×
1040
            self._index = ctypes.c_uint32()
×
UNCOV
1041
            _canlib.canSchedulerAddMessage(self._scheduler, self._msg, self._index)
×
UNCOV
1042
        _canlib.canSchedulerStartMessage(self._scheduler, self._index, self._count)
×
1043

1044
    def pause(self):
7✔
1045
        """Pause transmitting message (keep it in the list)."""
UNCOV
1046
        _canlib.canSchedulerStopMessage(self._scheduler, self._index)
×
1047

1048
    def stop(self):
7✔
1049
        """Stop transmitting message (remove from list)."""
1050
        # Remove it completely instead of just stopping it to avoid filling up
1051
        # the list with permanently stopped messages
1052
        _canlib.canSchedulerRemMessage(self._scheduler, self._index)
×
1053
        self._index = None
×
1054

1055

1056
def _format_can_status(status_flags: int):
7✔
1057
    """
1058
    Format a status bitfield found in CAN_MSGTYPE_STATUS messages or in dwStatus
1059
    field in CANLINESTATUS.
1060

1061
    Valid states are defined in the CAN_STATUS_* constants in cantype.h
1062
    """
UNCOV
1063
    states = []
×
UNCOV
1064
    for flag, description in CAN_STATUS_FLAGS.items():
×
UNCOV
1065
        if status_flags & flag:
×
UNCOV
1066
            states.append(description)
×
1067
            status_flags &= ~flag
×
1068

1069
    if status_flags:
×
UNCOV
1070
        states.append(f"unknown state 0x{status_flags:02x}")
×
1071

UNCOV
1072
    if states:
×
1073
        return "CAN status message: {}".format(", ".join(states))
×
1074
    else:
1075
        return "Empty CAN status message"
×
1076

1077

1078
def get_ixxat_hwids():
7✔
1079
    """Get a list of hardware ids of all available IXXAT devices."""
UNCOV
1080
    hwids = []
×
1081
    device_handle = HANDLE()
×
UNCOV
1082
    device_info = structures.VCIDEVICEINFO()
×
1083

UNCOV
1084
    _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle))
×
1085
    while True:
UNCOV
1086
        try:
×
UNCOV
1087
            _canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info))
×
UNCOV
1088
        except StopIteration:
×
UNCOV
1089
            break
×
1090
        else:
UNCOV
1091
            hwids.append(device_info.UniqueHardwareId.AsChar.decode("ascii"))
×
UNCOV
1092
    _canlib.vciEnumDeviceClose(device_handle)
×
1093

UNCOV
1094
    return hwids
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc