• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.8
/can/interfaces/vector/canlib.py
1
"""
21✔
2
Ctypes wrapper module for Vector CAN Interface on win32/win64 systems.
3

4
Authors: Julien Grave <grave.jul@gmail.com>, Christian Sandberg
5
"""
6

7
import contextlib
21✔
8
import ctypes
21✔
9
import logging
21✔
10
import os
21✔
11
import time
21✔
12
import warnings
21✔
13
from collections.abc import Iterator, Sequence
21✔
14
from types import ModuleType
21✔
15
from typing import (
21✔
16
    Any,
17
    Callable,
18
    NamedTuple,
19
    Optional,
20
    Union,
21
    cast,
22
)
23

24
from can import (
21✔
25
    BitTiming,
26
    BitTimingFd,
27
    BusABC,
28
    CanInitializationError,
29
    CanInterfaceNotImplementedError,
30
    CanProtocol,
31
    Message,
32
)
33
from can.typechecking import AutoDetectedConfig, CanFilters
21✔
34
from can.util import (
21✔
35
    check_or_adjust_timing_clock,
36
    deprecated_args_alias,
37
    dlc2len,
38
    len2dlc,
39
    time_perfcounter_correlation,
40
)
41

42
from . import xlclass, xldefine
21✔
43
from .exceptions import VectorError, VectorInitializationError, VectorOperationError
21✔
44

45
LOG = logging.getLogger(__name__)
21✔
46

47
# Import safely Vector API module for Travis tests
48
xldriver: Optional[ModuleType] = None
21✔
49
try:
21✔
50
    from . import xldriver
21✔
51
except FileNotFoundError as exc:
21✔
52
    LOG.warning("Could not import vxlapi: %s", exc)
21✔
53

54
WaitForSingleObject: Optional[Callable[[int, int], int]]
21✔
55
INFINITE: Optional[int]
21✔
56
try:
21✔
57
    # Try builtin Python 3 Windows API
58
    from _winapi import (  # type: ignore[attr-defined,no-redef,unused-ignore]
21✔
59
        INFINITE,
60
        WaitForSingleObject,
61
    )
62

63
    HAS_EVENTS = True
7✔
64
except ImportError:
14✔
65
    WaitForSingleObject, INFINITE = None, None
14✔
66
    HAS_EVENTS = False
14✔
67

68

69
class VectorBus(BusABC):
21✔
70
    """The CAN Bus implemented for the Vector interface."""
21✔
71

72
    @deprecated_args_alias(
21✔
73
        deprecation_start="4.0.0",
74
        deprecation_end="5.0.0",
75
        **{
76
            "sjwAbr": "sjw_abr",
77
            "tseg1Abr": "tseg1_abr",
78
            "tseg2Abr": "tseg2_abr",
79
            "sjwDbr": "sjw_dbr",
80
            "tseg1Dbr": "tseg1_dbr",
81
            "tseg2Dbr": "tseg2_dbr",
82
        },
83
    )
84
    def __init__(
21✔
85
        self,
86
        channel: Union[int, Sequence[int], str],
87
        can_filters: Optional[CanFilters] = None,
88
        poll_interval: float = 0.01,
89
        receive_own_messages: bool = False,
90
        timing: Optional[Union[BitTiming, BitTimingFd]] = None,
91
        bitrate: Optional[int] = None,
92
        rx_queue_size: int = 2**14,
93
        app_name: Optional[str] = "CANalyzer",
94
        serial: Optional[int] = None,
95
        fd: bool = False,
96
        data_bitrate: Optional[int] = None,
97
        sjw_abr: int = 2,
98
        tseg1_abr: int = 6,
99
        tseg2_abr: int = 3,
100
        sjw_dbr: int = 2,
101
        tseg1_dbr: int = 6,
102
        tseg2_dbr: int = 3,
103
        listen_only: Optional[bool] = False,
104
        **kwargs: Any,
105
    ) -> None:
106
        """
107
        :param channel:
108
            The channel indexes to create this bus with.
109
            Can also be a single integer or a comma separated string.
110
        :param can_filters:
111
            See :class:`can.BusABC`.
112
        :param receive_own_messages:
113
            See :class:`can.BusABC`.
114
        :param timing:
115
            An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd`
116
            to specify the bit timing parameters for the VectorBus interface. The
117
            `f_clock` value of the timing instance must be set to 8_000_000 (8MHz)
118
            or 16_000_000 (16MHz) for CAN 2.0 or 80_000_000 (80MHz) for CAN FD.
119
            If this parameter is provided, it takes precedence over all other
120
            timing-related parameters.
121
            Otherwise, the bit timing can be specified using the following parameters:
122
            `bitrate` for standard CAN or `fd`, `data_bitrate`, `sjw_abr`, `tseg1_abr`,
123
            `tseg2_abr`, `sjw_dbr`, `tseg1_dbr`, and `tseg2_dbr` for CAN FD.
124
        :param poll_interval:
125
            Poll interval in seconds.
126
        :param bitrate:
127
            Bitrate in bits/s.
128
        :param rx_queue_size:
129
            Number of messages in receive queue (power of 2).
130
            CAN: range `16…32768`
131
            CAN-FD: range `8192…524288`
132
        :param app_name:
133
            Name of application in *Vector Hardware Config*.
134
            If set to `None`, the channel should be a global channel index.
135
        :param serial:
136
            Serial number of the hardware to be used.
137
            If set, the channel parameter refers to the channels ONLY on the specified hardware.
138
            If set, the `app_name` does not have to be previously defined in
139
            *Vector Hardware Config*.
140
        :param fd:
141
            If CAN-FD frames should be supported.
142
        :param data_bitrate:
143
            Which bitrate to use for data phase in CAN FD.
144
            Defaults to arbitration bitrate.
145
        :param sjw_abr:
146
            Bus timing value sample jump width (arbitration).
147
        :param tseg1_abr:
148
            Bus timing value tseg1 (arbitration)
149
        :param tseg2_abr:
150
            Bus timing value tseg2 (arbitration)
151
        :param sjw_dbr:
152
            Bus timing value sample jump width (data)
153
        :param tseg1_dbr:
154
            Bus timing value tseg1 (data)
155
        :param tseg2_dbr:
156
            Bus timing value tseg2 (data)
157
        :param listen_only:
158
            if the bus should be set to listen only mode.
159

160
        :raise ~can.exceptions.CanInterfaceNotImplementedError:
161
            If the current operating system is not supported or the driver could not be loaded.
162
        :raise ~can.exceptions.CanInitializationError:
163
            If the bus could not be set up.
164
            This may or may not be a :class:`~can.interfaces.vector.VectorInitializationError`.
165
        """
166
        self.__testing = kwargs.get("_testing", False)
21✔
167
        if os.name != "nt" and not self.__testing:
21✔
168
            raise CanInterfaceNotImplementedError(
14✔
169
                f"The Vector interface is only supported on Windows, "
170
                f'but you are running "{os.name}"'
171
            )
172

173
        if xldriver is None:
21✔
UNCOV
174
            raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
×
175
        self.xldriver = xldriver  # keep reference so mypy knows it is not None
21✔
176
        self.xldriver.xlOpenDriver()
21✔
177

178
        self.poll_interval = poll_interval
21✔
179

180
        self.channels: Sequence[int]
21✔
181
        if isinstance(channel, int):
21✔
182
            self.channels = [channel]
21✔
UNCOV
183
        elif isinstance(channel, str):  # must be checked before generic Sequence
×
184
            # Assume comma separated string of channels
UNCOV
185
            self.channels = [int(ch.strip()) for ch in channel.split(",")]
×
UNCOV
186
        elif isinstance(channel, Sequence):
×
187
            self.channels = [int(ch) for ch in channel]
×
188
        else:
189
            raise TypeError(
×
190
                f"Invalid type for parameter 'channel': {type(channel).__name__}"
191
            )
192

193
        self._app_name = app_name.encode() if app_name is not None else b""
21✔
194
        self.channel_info = "Application {}: {}".format(
21✔
195
            app_name,
196
            ", ".join(f"CAN {ch + 1}" for ch in self.channels),
197
        )
198

199
        channel_configs = get_channel_configs()
21✔
200
        is_fd = isinstance(timing, BitTimingFd) if timing else fd
21✔
201

202
        self.mask = 0
21✔
203
        self.channel_masks: dict[int, int] = {}
21✔
204
        self.index_to_channel: dict[int, int] = {}
21✔
205
        self._can_protocol = CanProtocol.CAN_FD if is_fd else CanProtocol.CAN_20
21✔
206

207
        self._listen_only = listen_only
21✔
208

209
        for channel in self.channels:
21✔
210
            if (
21✔
211
                len(self.channels) == 1
212
                and (_channel_index := kwargs.get("channel_index", None)) is not None
213
            ):
214
                # VectorBus._detect_available_configs() might return multiple
215
                # devices with the same serial number, e.g. if a VN8900 is connected via both USB and Ethernet
216
                # at the same time. If the VectorBus is instantiated with a config, that was returned from
217
                # VectorBus._detect_available_configs(), then use the contained global channel_index
218
                # to avoid any ambiguities.
UNCOV
219
                channel_index = cast("int", _channel_index)
×
220
            else:
221
                channel_index = self._find_global_channel_idx(
21✔
222
                    channel=channel,
223
                    serial=serial,
224
                    app_name=app_name,
225
                    channel_configs=channel_configs,
226
                )
227
            LOG.debug("Channel index %d found", channel)
21✔
228

229
            channel_mask = 1 << channel_index
21✔
230
            self.channel_masks[channel] = channel_mask
21✔
231
            self.index_to_channel[channel_index] = channel
21✔
232
            self.mask |= channel_mask
21✔
233

234
        permission_mask = xlclass.XLaccess()
21✔
235
        # Set mask to request channel init permission if needed
236
        if bitrate or fd or timing or self._listen_only:
21✔
237
            permission_mask.value = self.mask
21✔
238

239
        interface_version = (
21✔
240
            xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4
241
            if is_fd
242
            else xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION
243
        )
244

245
        self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE)
21✔
246
        self.xldriver.xlOpenPort(
21✔
247
            self.port_handle,
248
            self._app_name,
249
            self.mask,
250
            permission_mask,
251
            rx_queue_size,
252
            interface_version,
253
            xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
254
        )
255
        self.permission_mask = permission_mask.value
21✔
256

257
        LOG.debug(
21✔
258
            "Open Port: PortHandle: %d, ChannelMask: 0x%X, PermissionMask: 0x%X",
259
            self.port_handle.value,
260
            self.mask,
261
            self.permission_mask,
262
        )
263

264
        assert_timing = (bitrate or timing) and not self.__testing
21✔
265

266
        # set CAN settings
267
        if isinstance(timing, BitTiming):
21✔
268
            timing = check_or_adjust_timing_clock(timing, [16_000_000, 8_000_000])
21✔
269
            self._set_bit_timing(channel_mask=self.mask, timing=timing)
21✔
270
            if assert_timing:
21✔
UNCOV
271
                self._check_can_settings(
×
272
                    channel_mask=self.mask,
273
                    bitrate=timing.bitrate,
274
                    sample_point=timing.sample_point,
275
                )
276
        elif isinstance(timing, BitTimingFd):
21✔
277
            timing = check_or_adjust_timing_clock(timing, [80_000_000])
21✔
278
            self._set_bit_timing_fd(channel_mask=self.mask, timing=timing)
21✔
279
            if assert_timing:
21✔
UNCOV
280
                self._check_can_settings(
×
281
                    channel_mask=self.mask,
282
                    bitrate=timing.nom_bitrate,
283
                    sample_point=timing.nom_sample_point,
284
                    fd=True,
285
                    data_bitrate=timing.data_bitrate,
286
                    data_sample_point=timing.data_sample_point,
287
                )
288
        elif fd:
21✔
289
            timing = BitTimingFd.from_bitrate_and_segments(
21✔
290
                f_clock=80_000_000,
291
                nom_bitrate=bitrate or 500_000,
292
                nom_tseg1=tseg1_abr,
293
                nom_tseg2=tseg2_abr,
294
                nom_sjw=sjw_abr,
295
                data_bitrate=data_bitrate or bitrate or 500_000,
296
                data_tseg1=tseg1_dbr,
297
                data_tseg2=tseg2_dbr,
298
                data_sjw=sjw_dbr,
299
            )
300
            self._set_bit_timing_fd(channel_mask=self.mask, timing=timing)
21✔
301
            if assert_timing:
21✔
UNCOV
302
                self._check_can_settings(
×
303
                    channel_mask=self.mask,
304
                    bitrate=timing.nom_bitrate,
305
                    sample_point=timing.nom_sample_point,
306
                    fd=True,
307
                    data_bitrate=timing.data_bitrate,
308
                    data_sample_point=timing.data_sample_point,
309
                )
310
        elif bitrate:
21✔
311
            self._set_bitrate(channel_mask=self.mask, bitrate=bitrate)
21✔
312
            if assert_timing:
21✔
UNCOV
313
                self._check_can_settings(channel_mask=self.mask, bitrate=bitrate)
×
314

315
        if self._listen_only:
21✔
316
            self._set_output_mode(channel_mask=self.mask, listen_only=True)
21✔
317

318
        # Enable/disable TX receipts
319
        tx_receipts = 1 if receive_own_messages else 0
21✔
320
        self.xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0)
21✔
321

322
        if HAS_EVENTS:
21✔
UNCOV
323
            self.event_handle = xlclass.XLhandle()
×
UNCOV
324
            self.xldriver.xlSetNotification(self.port_handle, self.event_handle, 1)
×
325
        else:
326
            LOG.info("Install pywin32 to avoid polling")
21✔
327

328
        # Calculate time offset for absolute timestamps
329
        offset = xlclass.XLuint64()
21✔
330
        try:
21✔
331
            if time.get_clock_info("time").resolution > 1e-5:
21✔
332
                ts, perfcounter = time_perfcounter_correlation()
6✔
333
                try:
6✔
334
                    self.xldriver.xlGetSyncTime(self.port_handle, offset)
6✔
UNCOV
335
                except VectorInitializationError:
×
UNCOV
336
                    self.xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
×
337
                current_perfcounter = time.perf_counter()
6✔
338
                now = ts + (current_perfcounter - perfcounter)
6✔
339
                self._time_offset = now - offset.value * 1e-9
6✔
340
            else:
341
                try:
15✔
342
                    self.xldriver.xlGetSyncTime(self.port_handle, offset)
15✔
UNCOV
343
                except VectorInitializationError:
×
UNCOV
344
                    self.xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
×
345
                self._time_offset = time.time() - offset.value * 1e-9
15✔
346

347
        except VectorInitializationError:
×
348
            self._time_offset = 0.0
×
349

350
        self._is_filtered = False
21✔
351
        super().__init__(
21✔
352
            channel=channel,
353
            can_filters=can_filters,
354
            **kwargs,
355
        )
356

357
        # activate channels after CAN filters were applied
358
        try:
21✔
359
            self.xldriver.xlActivateChannel(
21✔
360
                self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, 0
361
            )
UNCOV
362
        except VectorOperationError as error:
×
UNCOV
363
            self.shutdown()
×
UNCOV
364
            raise VectorInitializationError.from_generic(error) from None
×
365

366
    @property
21✔
367
    def fd(self) -> bool:
21✔
368
        class_name = self.__class__.__name__
×
UNCOV
369
        warnings.warn(
×
370
            f"The {class_name}.fd property is deprecated and superseded by "
371
            f"{class_name}.protocol. It is scheduled for removal in python-can version 5.0.",
372
            DeprecationWarning,
373
            stacklevel=2,
374
        )
UNCOV
375
        return self._can_protocol is CanProtocol.CAN_FD
×
376

377
    def _find_global_channel_idx(
21✔
378
        self,
379
        channel: int,
380
        serial: Optional[int],
381
        app_name: Optional[str],
382
        channel_configs: list["VectorChannelConfig"],
383
    ) -> int:
384
        if serial is not None:
21✔
UNCOV
385
            serial_found = False
×
UNCOV
386
            for channel_config in channel_configs:
×
UNCOV
387
                if channel_config.serial_number != serial:
×
UNCOV
388
                    continue
×
389

390
                serial_found = True
×
391
                if channel_config.hw_channel == channel:
×
392
                    return channel_config.channel_index
×
393

394
            if not serial_found:
×
395
                err_msg = f"No interface with serial {serial} found."
×
396
            else:
UNCOV
397
                err_msg = (
×
398
                    f"Channel {channel} not found on interface with serial {serial}."
399
                )
UNCOV
400
            raise CanInitializationError(
×
401
                err_msg, error_code=xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT
402
            )
403

404
        if app_name:
21✔
405
            hw_type, hw_index, hw_channel = self.get_application_config(
21✔
406
                app_name, channel
407
            )
408
            idx = cast(
21✔
409
                "int", self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel)
410
            )
411
            if idx < 0:
21✔
412
                # Undocumented behavior! See issue #353.
413
                # If hardware is unavailable, this function returns -1.
414
                # Raise an exception as if the driver
415
                # would have signalled XL_ERR_HW_NOT_PRESENT.
UNCOV
416
                raise VectorInitializationError(
×
417
                    xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT,
418
                    xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT.name,
419
                    "xlGetChannelIndex",
420
                )
421
            return idx
21✔
422

423
        # check if channel is a valid global channel index
UNCOV
424
        for channel_config in channel_configs:
×
UNCOV
425
            if channel == channel_config.channel_index:
×
UNCOV
426
                return channel
×
427

428
        raise CanInitializationError(
×
429
            f"Channel {channel} not found. The 'channel' parameter must be "
430
            f"a valid global channel index if neither 'app_name' nor 'serial' were given.",
431
            error_code=xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT,
432
        )
433

434
    def _has_init_access(self, channel: int) -> bool:
21✔
UNCOV
435
        return bool(self.permission_mask & self.channel_masks[channel])
×
436

437
    def _read_bus_params(
21✔
438
        self, channel_index: int, vcc_list: list["VectorChannelConfig"]
439
    ) -> "VectorBusParams":
UNCOV
440
        for vcc in vcc_list:
×
UNCOV
441
            if vcc.channel_index == channel_index:
×
UNCOV
442
                bus_params = vcc.bus_params
×
UNCOV
443
                if bus_params is None:
×
444
                    # for CAN channels, this should never be `None`
445
                    raise ValueError("Invalid bus parameters.")
×
446
                return bus_params
×
447

UNCOV
448
        channel = self.index_to_channel[channel_index]
×
449
        raise CanInitializationError(
×
450
            f"Channel configuration for channel {channel} not found."
451
        )
452

453
    def _set_output_mode(self, channel_mask: int, listen_only: bool) -> None:
21✔
454
        # set parameters for channels with init access
455
        channel_mask = channel_mask & self.permission_mask
21✔
456

457
        if channel_mask:
21✔
458
            if listen_only:
21✔
459
                self.xldriver.xlCanSetChannelOutput(
21✔
460
                    self.port_handle,
461
                    channel_mask,
462
                    xldefine.XL_OutputMode.XL_OUTPUT_MODE_SILENT,
463
                )
464
            else:
UNCOV
465
                self.xldriver.xlCanSetChannelOutput(
×
466
                    self.port_handle,
467
                    channel_mask,
468
                    xldefine.XL_OutputMode.XL_OUTPUT_MODE_NORMAL,
469
                )
470

471
            LOG.info("xlCanSetChannelOutput: listen_only=%u", listen_only)
21✔
472
        else:
UNCOV
473
            LOG.warning("No channels with init access to set listen only mode")
×
474

475
    def _set_bitrate(self, channel_mask: int, bitrate: int) -> None:
21✔
476
        # set parameters for channels with init access
477
        channel_mask = channel_mask & self.permission_mask
21✔
478
        if channel_mask:
21✔
479
            self.xldriver.xlCanSetChannelBitrate(
21✔
480
                self.port_handle,
481
                channel_mask,
482
                bitrate,
483
            )
484
            LOG.info("xlCanSetChannelBitrate: baudr.=%u", bitrate)
21✔
485

486
    def _set_bit_timing(self, channel_mask: int, timing: BitTiming) -> None:
21✔
487
        # set parameters for channels with init access
488
        channel_mask = channel_mask & self.permission_mask
21✔
489
        if channel_mask:
21✔
490
            if timing.f_clock == 8_000_000:
21✔
491
                self.xldriver.xlCanSetChannelParamsC200(
21✔
492
                    self.port_handle,
493
                    channel_mask,
494
                    timing.btr0,
495
                    timing.btr1,
496
                )
497
                LOG.info(
21✔
498
                    "xlCanSetChannelParamsC200: BTR0=%#02x, BTR1=%#02x",
499
                    timing.btr0,
500
                    timing.btr1,
501
                )
502
            elif timing.f_clock == 16_000_000:
21✔
503
                chip_params = xlclass.XLchipParams()
21✔
504
                chip_params.bitRate = timing.bitrate
21✔
505
                chip_params.sjw = timing.sjw
21✔
506
                chip_params.tseg1 = timing.tseg1
21✔
507
                chip_params.tseg2 = timing.tseg2
21✔
508
                chip_params.sam = timing.nof_samples
21✔
509
                self.xldriver.xlCanSetChannelParams(
21✔
510
                    self.port_handle,
511
                    channel_mask,
512
                    chip_params,
513
                )
514
                LOG.info(
21✔
515
                    "xlCanSetChannelParams: baudr.=%u, sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
516
                    chip_params.bitRate,
517
                    chip_params.sjw,
518
                    chip_params.tseg1,
519
                    chip_params.tseg2,
520
                )
521
            else:
UNCOV
522
                raise CanInitializationError(
×
523
                    f"timing.f_clock must be 8_000_000 or 16_000_000 (is {timing.f_clock})"
524
                )
525

526
    def _set_bit_timing_fd(
21✔
527
        self,
528
        channel_mask: int,
529
        timing: BitTimingFd,
530
    ) -> None:
531
        # set parameters for channels with init access
532
        channel_mask = channel_mask & self.permission_mask
21✔
533
        if channel_mask:
21✔
534
            canfd_conf = xlclass.XLcanFdConf()
21✔
535
            canfd_conf.arbitrationBitRate = timing.nom_bitrate
21✔
536
            canfd_conf.sjwAbr = timing.nom_sjw
21✔
537
            canfd_conf.tseg1Abr = timing.nom_tseg1
21✔
538
            canfd_conf.tseg2Abr = timing.nom_tseg2
21✔
539
            canfd_conf.dataBitRate = timing.data_bitrate
21✔
540
            canfd_conf.sjwDbr = timing.data_sjw
21✔
541
            canfd_conf.tseg1Dbr = timing.data_tseg1
21✔
542
            canfd_conf.tseg2Dbr = timing.data_tseg2
21✔
543
            self.xldriver.xlCanFdSetConfiguration(
21✔
544
                self.port_handle, channel_mask, canfd_conf
545
            )
546
            LOG.info(
21✔
547
                "xlCanFdSetConfiguration.: ABaudr.=%u, DBaudr.=%u",
548
                canfd_conf.arbitrationBitRate,
549
                canfd_conf.dataBitRate,
550
            )
551
            LOG.info(
21✔
552
                "xlCanFdSetConfiguration.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
553
                canfd_conf.sjwAbr,
554
                canfd_conf.tseg1Abr,
555
                canfd_conf.tseg2Abr,
556
            )
557
            LOG.info(
21✔
558
                "xlCanFdSetConfiguration.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u",
559
                canfd_conf.sjwDbr,
560
                canfd_conf.tseg1Dbr,
561
                canfd_conf.tseg2Dbr,
562
            )
563

564
    def _check_can_settings(
21✔
565
        self,
566
        channel_mask: int,
567
        bitrate: int,
568
        sample_point: Optional[float] = None,
569
        fd: bool = False,
570
        data_bitrate: Optional[int] = None,
571
        data_sample_point: Optional[float] = None,
572
    ) -> None:
573
        """Compare requested CAN settings to active settings in driver."""
UNCOV
574
        vcc_list = get_channel_configs()
×
UNCOV
575
        for channel_index in _iterate_channel_index(channel_mask):
×
UNCOV
576
            bus_params = self._read_bus_params(
×
577
                channel_index=channel_index, vcc_list=vcc_list
578
            )
579
            # use bus_params.canfd even if fd==False, bus_params.can and bus_params.canfd are a C union
580
            bus_params_data = bus_params.canfd
×
UNCOV
581
            settings_acceptable = True
×
582

583
            # check bus type
584
            settings_acceptable &= (
×
585
                bus_params.bus_type is xldefine.XL_BusTypes.XL_BUS_TYPE_CAN
586
            )
587

588
            # check CAN operation mode
589
            # skip the check if can_op_mode is 0
590
            # as it happens for cancaseXL, VN7600 and sometimes on other hardware (VN1640)
UNCOV
591
            if bus_params_data.can_op_mode:
×
UNCOV
592
                if fd:
×
UNCOV
593
                    settings_acceptable &= bool(
×
594
                        bus_params_data.can_op_mode
595
                        & xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD
596
                    )
597
                else:
UNCOV
598
                    settings_acceptable &= bool(
×
599
                        bus_params_data.can_op_mode
600
                        & xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20
601
                    )
602

603
            # check bitrates
UNCOV
604
            if bitrate:
×
UNCOV
605
                settings_acceptable &= (
×
606
                    abs(bus_params_data.bitrate - bitrate) < bitrate / 256
607
                )
608
            if fd and data_bitrate:
×
609
                settings_acceptable &= (
×
610
                    abs(bus_params_data.data_bitrate - data_bitrate)
611
                    < data_bitrate / 256
612
                )
613

614
            # check sample points
UNCOV
615
            if sample_point:
×
UNCOV
616
                nom_sample_point_act = (
×
617
                    100
618
                    * (1 + bus_params_data.tseg1_abr)
619
                    / (1 + bus_params_data.tseg1_abr + bus_params_data.tseg2_abr)
620
                )
UNCOV
621
                settings_acceptable &= (
×
622
                    abs(nom_sample_point_act - sample_point)
623
                    < 2.0  # 2 percent tolerance
624
                )
625
            if fd and data_sample_point:
×
UNCOV
626
                data_sample_point_act = (
×
627
                    100
628
                    * (1 + bus_params_data.tseg1_dbr)
629
                    / (1 + bus_params_data.tseg1_dbr + bus_params_data.tseg2_dbr)
630
                )
UNCOV
631
                settings_acceptable &= (
×
632
                    abs(data_sample_point_act - data_sample_point)
633
                    < 2.0  # 2 percent tolerance
634
                )
635

UNCOV
636
            if not settings_acceptable:
×
637
                # The error message depends on the currently active CAN settings.
638
                # If the active operation mode is CAN FD, show the active CAN FD timings,
639
                # otherwise show CAN 2.0 timings.
640
                if bool(
×
641
                    bus_params_data.can_op_mode
642
                    & xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD
643
                ):
644
                    active_settings = bus_params.canfd._asdict()
×
UNCOV
645
                    active_settings["can_op_mode"] = "CAN FD"
×
646
                else:
UNCOV
647
                    active_settings = bus_params.can._asdict()
×
648
                    active_settings["can_op_mode"] = "CAN 2.0"
×
649
                settings_string = ", ".join(
×
650
                    [f"{key}: {val}" for key, val in active_settings.items()]
651
                )
652
                channel = self.index_to_channel[channel_index]
×
653
                raise CanInitializationError(
×
654
                    f"The requested settings could not be set for channel {channel}. "
655
                    f"Another application might have set incompatible settings. "
656
                    f"These are the currently active settings: {settings_string}."
657
                )
658

659
    def _apply_filters(self, filters: Optional[CanFilters]) -> None:
21✔
660
        if filters:
21✔
661
            # Only up to one filter per ID type allowed
UNCOV
662
            if len(filters) == 1 or (
×
663
                len(filters) == 2
664
                and filters[0].get("extended") != filters[1].get("extended")
665
            ):
666
                try:
×
UNCOV
667
                    for can_filter in filters:
×
UNCOV
668
                        self.xldriver.xlCanSetChannelAcceptance(
×
669
                            self.port_handle,
670
                            self.mask,
671
                            can_filter["can_id"],
672
                            can_filter["can_mask"],
673
                            (
674
                                xldefine.XL_AcceptanceFilter.XL_CAN_EXT
675
                                if can_filter.get("extended")
676
                                else xldefine.XL_AcceptanceFilter.XL_CAN_STD
677
                            ),
678
                        )
UNCOV
679
                except VectorOperationError as exception:
×
UNCOV
680
                    LOG.warning("Could not set filters: %s", exception)
×
681
                    # go to fallback
682
                else:
683
                    self._is_filtered = True
×
684
                    return
×
685
            else:
UNCOV
686
                LOG.warning("Only up to one filter per extended or standard ID allowed")
×
687
                # go to fallback
688

689
        # fallback: reset filters
690
        self._is_filtered = False
21✔
691
        try:
21✔
692
            self.xldriver.xlCanSetChannelAcceptance(
21✔
693
                self.port_handle,
694
                self.mask,
695
                0x0,
696
                0x0,
697
                xldefine.XL_AcceptanceFilter.XL_CAN_EXT,
698
            )
699
            self.xldriver.xlCanSetChannelAcceptance(
21✔
700
                self.port_handle,
701
                self.mask,
702
                0x0,
703
                0x0,
704
                xldefine.XL_AcceptanceFilter.XL_CAN_STD,
705
            )
UNCOV
706
        except VectorOperationError as exc:
×
UNCOV
707
            LOG.warning("Could not reset filters: %s", exc)
×
708

709
    def _recv_internal(
21✔
710
        self, timeout: Optional[float]
711
    ) -> tuple[Optional[Message], bool]:
712
        end_time = time.time() + timeout if timeout is not None else None
21✔
713

714
        while True:
15✔
715
            try:
21✔
716
                if self._can_protocol is CanProtocol.CAN_FD:
21✔
717
                    msg = self._recv_canfd()
21✔
718
                else:
719
                    msg = self._recv_can()
21✔
720

UNCOV
721
            except VectorOperationError as exception:
×
UNCOV
722
                if exception.error_code != xldefine.XL_Status.XL_ERR_QUEUE_IS_EMPTY:
×
UNCOV
723
                    raise
×
724
            else:
725
                if msg:
21✔
726
                    return msg, self._is_filtered
21✔
727

728
            # if no message was received, wait or return on timeout
729
            if end_time is not None and time.time() > end_time:
21✔
730
                return None, self._is_filtered
21✔
731

732
            if HAS_EVENTS:
21✔
733
                # Wait for receive event to occur
UNCOV
734
                if end_time is None:
×
UNCOV
735
                    time_left_ms = INFINITE
×
736
                else:
UNCOV
737
                    time_left = end_time - time.time()
×
738
                    time_left_ms = max(0, int(time_left * 1000))
×
739
                WaitForSingleObject(self.event_handle.value, time_left_ms)  # type: ignore
×
740
            else:
741
                # Wait a short time until we try again
742
                time.sleep(self.poll_interval)
21✔
743

744
    def _recv_canfd(self) -> Optional[Message]:
21✔
745
        xl_can_rx_event = xlclass.XLcanRxEvent()
21✔
746
        self.xldriver.xlCanReceive(self.port_handle, xl_can_rx_event)
21✔
747

748
        if xl_can_rx_event.tag == xldefine.XL_CANFD_RX_EventTags.XL_CAN_EV_TAG_RX_OK:
21✔
749
            is_rx = True
21✔
750
            data_struct = xl_can_rx_event.tagData.canRxOkMsg
21✔
751
        elif xl_can_rx_event.tag == xldefine.XL_CANFD_RX_EventTags.XL_CAN_EV_TAG_TX_OK:
21✔
UNCOV
752
            is_rx = False
×
UNCOV
753
            data_struct = xl_can_rx_event.tagData.canTxOkMsg
×
754
        else:
755
            self.handle_canfd_event(xl_can_rx_event)
21✔
756
            return None
21✔
757

758
        msg_id = data_struct.canId
21✔
759
        dlc = dlc2len(data_struct.dlc)
21✔
760
        flags = data_struct.msgFlags
21✔
761
        timestamp = xl_can_rx_event.timeStamp * 1e-9
21✔
762
        channel = self.index_to_channel.get(xl_can_rx_event.chanIndex)
21✔
763

764
        return Message(
21✔
765
            timestamp=timestamp + self._time_offset,
766
            arbitration_id=msg_id & 0x1FFFFFFF,
767
            is_extended_id=bool(
768
                msg_id & xldefine.XL_MessageFlagsExtended.XL_CAN_EXT_MSG_ID
769
            ),
770
            is_remote_frame=bool(
771
                flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_RTR
772
            ),
773
            is_error_frame=bool(
774
                flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_EF
775
            ),
776
            is_fd=bool(flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_EDL),
777
            bitrate_switch=bool(
778
                flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_BRS
779
            ),
780
            error_state_indicator=bool(
781
                flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_ESI
782
            ),
783
            is_rx=is_rx,
784
            channel=channel,
785
            dlc=dlc,
786
            data=data_struct.data[:dlc],
787
        )
788

789
    def _recv_can(self) -> Optional[Message]:
21✔
790
        xl_event = xlclass.XLevent()
21✔
791
        event_count = ctypes.c_uint(1)
21✔
792
        self.xldriver.xlReceive(self.port_handle, event_count, xl_event)
21✔
793

794
        if xl_event.tag != xldefine.XL_EventTags.XL_RECEIVE_MSG:
21✔
795
            self.handle_can_event(xl_event)
21✔
796
            return None
21✔
797

798
        msg_id = xl_event.tagData.msg.id
21✔
799
        dlc = xl_event.tagData.msg.dlc
21✔
800
        flags = xl_event.tagData.msg.flags
21✔
801
        timestamp = xl_event.timeStamp * 1e-9
21✔
802
        channel = self.index_to_channel.get(xl_event.chanIndex)
21✔
803

804
        return Message(
21✔
805
            timestamp=timestamp + self._time_offset,
806
            arbitration_id=msg_id & 0x1FFFFFFF,
807
            is_extended_id=bool(
808
                msg_id & xldefine.XL_MessageFlagsExtended.XL_CAN_EXT_MSG_ID
809
            ),
810
            is_remote_frame=bool(
811
                flags & xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_REMOTE_FRAME
812
            ),
813
            is_error_frame=bool(
814
                flags & xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_ERROR_FRAME
815
            ),
816
            is_rx=not bool(
817
                flags & xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_TX_COMPLETED
818
            ),
819
            is_fd=False,
820
            dlc=dlc,
821
            data=xl_event.tagData.msg.data[:dlc],
822
            channel=channel,
823
        )
824

825
    def handle_can_event(self, event: xlclass.XLevent) -> None:
21✔
826
        """Handle non-message CAN events.
827

828
        Method is called by :meth:`~can.interfaces.vector.VectorBus._recv_internal`
829
        when `event.tag` is not `XL_RECEIVE_MSG`. Subclasses can implement this method.
830

831
        :param event: XLevent that could have a `XL_CHIP_STATE`, `XL_TIMER` or `XL_SYNC_PULSE` tag.
832
        """
833

834
    def handle_canfd_event(self, event: xlclass.XLcanRxEvent) -> None:
21✔
835
        """Handle non-message CAN FD events.
836

837
        Method is called by :meth:`~can.interfaces.vector.VectorBus._recv_internal`
838
        when `event.tag` is not `XL_CAN_EV_TAG_RX_OK` or `XL_CAN_EV_TAG_TX_OK`.
839
        Subclasses can implement this method.
840

841
        :param event: `XLcanRxEvent` that could have a `XL_CAN_EV_TAG_RX_ERROR`,
842
            `XL_CAN_EV_TAG_TX_ERROR`, `XL_TIMER` or `XL_CAN_EV_TAG_CHIP_STATE` tag.
843
        """
844

845
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
21✔
846
        self._send_sequence([msg])
21✔
847

848
    def _send_sequence(self, msgs: Sequence[Message]) -> int:
21✔
849
        """Send messages and return number of successful transmissions."""
850
        if self._can_protocol is CanProtocol.CAN_FD:
21✔
851
            return self._send_can_fd_msg_sequence(msgs)
21✔
852
        else:
853
            return self._send_can_msg_sequence(msgs)
21✔
854

855
    def _get_tx_channel_mask(self, msgs: Sequence[Message]) -> int:
21✔
856
        if len(msgs) == 1:
21✔
857
            return self.channel_masks.get(msgs[0].channel, self.mask)  # type: ignore[arg-type]
21✔
858
        else:
UNCOV
859
            return self.mask
×
860

861
    def _send_can_msg_sequence(self, msgs: Sequence[Message]) -> int:
21✔
862
        """Send CAN messages and return number of successful transmissions."""
863
        mask = self._get_tx_channel_mask(msgs)
21✔
864
        message_count = ctypes.c_uint(len(msgs))
21✔
865

866
        xl_event_array = (xlclass.XLevent * message_count.value)(
21✔
867
            *map(self._build_xl_event, msgs)
868
        )
869

870
        self.xldriver.xlCanTransmit(
21✔
871
            self.port_handle, mask, message_count, xl_event_array
872
        )
873
        return message_count.value
21✔
874

875
    @staticmethod
21✔
876
    def _build_xl_event(msg: Message) -> xlclass.XLevent:
21✔
877
        msg_id = msg.arbitration_id
21✔
878
        if msg.is_extended_id:
21✔
879
            msg_id |= xldefine.XL_MessageFlagsExtended.XL_CAN_EXT_MSG_ID
21✔
880

881
        flags = 0
21✔
882
        if msg.is_remote_frame:
21✔
UNCOV
883
            flags |= xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_REMOTE_FRAME
×
884

885
        xl_event = xlclass.XLevent()
21✔
886
        xl_event.tag = xldefine.XL_EventTags.XL_TRANSMIT_MSG
21✔
887
        xl_event.tagData.msg.id = msg_id
21✔
888
        xl_event.tagData.msg.dlc = msg.dlc
21✔
889
        xl_event.tagData.msg.flags = flags
21✔
890
        xl_event.tagData.msg.data = tuple(msg.data)
21✔
891

892
        return xl_event
21✔
893

894
    def _send_can_fd_msg_sequence(self, msgs: Sequence[Message]) -> int:
21✔
895
        """Send CAN FD messages and return number of successful transmissions."""
896
        mask = self._get_tx_channel_mask(msgs)
21✔
897
        message_count = len(msgs)
21✔
898

899
        xl_can_tx_event_array = (xlclass.XLcanTxEvent * message_count)(
21✔
900
            *map(self._build_xl_can_tx_event, msgs)
901
        )
902

903
        msg_count_sent = ctypes.c_uint(0)
21✔
904
        self.xldriver.xlCanTransmitEx(
21✔
905
            self.port_handle, mask, message_count, msg_count_sent, xl_can_tx_event_array
906
        )
907
        return msg_count_sent.value
21✔
908

909
    @staticmethod
21✔
910
    def _build_xl_can_tx_event(msg: Message) -> xlclass.XLcanTxEvent:
21✔
911
        msg_id = msg.arbitration_id
21✔
912
        if msg.is_extended_id:
21✔
913
            msg_id |= xldefine.XL_MessageFlagsExtended.XL_CAN_EXT_MSG_ID
21✔
914

915
        flags = 0
21✔
916
        if msg.is_fd:
21✔
UNCOV
917
            flags |= xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_EDL
×
918
        if msg.bitrate_switch:
21✔
UNCOV
919
            flags |= xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_BRS
×
920
        if msg.is_remote_frame:
21✔
921
            flags |= xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_RTR
×
922

923
        xl_can_tx_event = xlclass.XLcanTxEvent()
21✔
924
        xl_can_tx_event.tag = xldefine.XL_CANFD_TX_EventTags.XL_CAN_EV_TAG_TX_MSG
21✔
925
        xl_can_tx_event.transId = 0xFFFF
21✔
926

927
        xl_can_tx_event.tagData.canMsg.canId = msg_id
21✔
928
        xl_can_tx_event.tagData.canMsg.msgFlags = flags
21✔
929
        xl_can_tx_event.tagData.canMsg.dlc = len2dlc(msg.dlc)
21✔
930
        xl_can_tx_event.tagData.canMsg.data = tuple(msg.data)
21✔
931

932
        return xl_can_tx_event
21✔
933

934
    def flush_tx_buffer(self) -> None:
21✔
935
        """
936
        Flush the TX buffer of the bus.
937

938
        Implementation does not use function ``xlCanFlushTransmitQueue`` of the XL driver, as it works only
939
        for XL family devices.
940

941
        .. warning::
942
            Using this function will flush the queue and send a high voltage message (ID = 0, DLC = 0, no data).
943
        """
944
        if self._can_protocol is CanProtocol.CAN_FD:
21✔
945
            xl_can_tx_event = xlclass.XLcanTxEvent()
21✔
946
            xl_can_tx_event.tag = xldefine.XL_CANFD_TX_EventTags.XL_CAN_EV_TAG_TX_MSG
21✔
947
            xl_can_tx_event.tagData.canMsg.msgFlags |= (
21✔
948
                xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_HIGHPRIO
949
            )
950

951
            self.xldriver.xlCanTransmitEx(
21✔
952
                self.port_handle,
953
                self.mask,
954
                ctypes.c_uint(1),
955
                ctypes.c_uint(0),
956
                xl_can_tx_event,
957
            )
958
        else:
959
            xl_event = xlclass.XLevent()
21✔
960
            xl_event.tag = xldefine.XL_EventTags.XL_TRANSMIT_MSG
21✔
961
            xl_event.tagData.msg.flags |= (
21✔
962
                xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_OVERRUN
963
                | xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_WAKEUP
964
            )
965

966
            self.xldriver.xlCanTransmit(
21✔
967
                self.port_handle, self.mask, ctypes.c_uint(1), xl_event
968
            )
969

970
    def shutdown(self) -> None:
21✔
971
        super().shutdown()
21✔
972

973
        with contextlib.suppress(VectorError):
21✔
974
            self.xldriver.xlDeactivateChannel(self.port_handle, self.mask)
21✔
975
            self.xldriver.xlClosePort(self.port_handle)
21✔
976
            self.xldriver.xlCloseDriver()
21✔
977

978
    def reset(self) -> None:
21✔
979
        self.xldriver.xlDeactivateChannel(self.port_handle, self.mask)
21✔
980
        self.xldriver.xlActivateChannel(
21✔
981
            self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, 0
982
        )
983

984
    @staticmethod
21✔
985
    def _detect_available_configs() -> list[AutoDetectedConfig]:
21✔
986
        configs = []
21✔
987
        channel_configs = get_channel_configs()
21✔
988
        LOG.info("Found %d channels", len(channel_configs))
21✔
989
        for channel_config in channel_configs:
21✔
990
            if (
21✔
991
                not channel_config.channel_bus_capabilities
992
                & xldefine.XL_BusCapabilities.XL_BUS_ACTIVE_CAP_CAN
993
            ):
994
                continue
21✔
995
            LOG.info(
21✔
996
                "Channel index %d: %s",
997
                channel_config.channel_index,
998
                channel_config.name,
999
            )
1000
            configs.append(
21✔
1001
                {
1002
                    # data for use in VectorBus.__init__():
1003
                    "interface": "vector",
1004
                    "channel": channel_config.hw_channel,
1005
                    "serial": channel_config.serial_number,
1006
                    "channel_index": channel_config.channel_index,
1007
                    # data for use in VectorBus.set_application_config():
1008
                    "hw_type": channel_config.hw_type,
1009
                    "hw_index": channel_config.hw_index,
1010
                    "hw_channel": channel_config.hw_channel,
1011
                    # additional information:
1012
                    "supports_fd": bool(
1013
                        channel_config.channel_capabilities
1014
                        & xldefine.XL_ChannelCapabilities.XL_CHANNEL_FLAG_CANFD_ISO_SUPPORT
1015
                    ),
1016
                    "vector_channel_config": channel_config,
1017
                }
1018
            )
1019
        return configs  # type: ignore
21✔
1020

1021
    @staticmethod
21✔
1022
    def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None:
21✔
1023
        """Open vector hardware configuration window.
1024

1025
        :param wait_for_finish:
1026
            Time to wait for user input in milliseconds.
1027
        """
1028
        if xldriver is None:
21✔
UNCOV
1029
            raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
×
1030

1031
        xldriver.xlPopupHwConfig(ctypes.c_char_p(), ctypes.c_uint(wait_for_finish))
21✔
1032

1033
    @staticmethod
21✔
1034
    def get_application_config(
21✔
1035
        app_name: str, app_channel: int
1036
    ) -> tuple[Union[int, xldefine.XL_HardwareType], int, int]:
1037
        """Retrieve information for an application in Vector Hardware Configuration.
1038

1039
        :param app_name:
1040
            The name of the application.
1041
        :param app_channel:
1042
            The channel of the application.
1043
        :return:
1044
            Returns a tuple of the hardware type, the hardware index and the
1045
            hardware channel.
1046

1047
        :raises can.interfaces.vector.VectorInitializationError:
1048
            If the application name does not exist in the Vector hardware configuration.
1049
        """
1050
        if xldriver is None:
21✔
UNCOV
1051
            raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
×
1052

1053
        hw_type = ctypes.c_uint()
21✔
1054
        hw_index = ctypes.c_uint()
21✔
1055
        hw_channel = ctypes.c_uint()
21✔
1056
        _app_channel = ctypes.c_uint(app_channel)
21✔
1057

1058
        try:
21✔
1059
            xldriver.xlGetApplConfig(
21✔
1060
                app_name.encode(),
1061
                _app_channel,
1062
                hw_type,
1063
                hw_index,
1064
                hw_channel,
1065
                xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
1066
            )
UNCOV
1067
        except VectorError as e:
×
UNCOV
1068
            raise VectorInitializationError(
×
1069
                error_code=e.error_code,
1070
                error_string=(
1071
                    f"Vector HW Config: Channel '{app_channel}' of "
1072
                    f"application '{app_name}' is not assigned to any interface"
1073
                ),
1074
                function="xlGetApplConfig",
1075
            ) from None
1076
        return _hw_type(hw_type.value), hw_index.value, hw_channel.value
21✔
1077

1078
    @staticmethod
21✔
1079
    def set_application_config(
21✔
1080
        app_name: str,
1081
        app_channel: int,
1082
        hw_type: Union[int, xldefine.XL_HardwareType],
1083
        hw_index: int,
1084
        hw_channel: int,
1085
        **kwargs: Any,
1086
    ) -> None:
1087
        """Modify the application settings in Vector Hardware Configuration.
1088

1089
        This method can also be used with a channel config dictionary::
1090

1091
            import can
1092
            from can.interfaces.vector import VectorBus
1093

1094
            configs = can.detect_available_configs(interfaces=['vector'])
1095
            cfg = configs[0]
1096
            VectorBus.set_application_config(app_name="MyApplication", app_channel=0, **cfg)
1097

1098
        :param app_name:
1099
            The name of the application. Creates a new application if it does
1100
            not exist yet.
1101
        :param app_channel:
1102
            The channel of the application.
1103
        :param hw_type:
1104
            The hardware type of the interface.
1105
            E.g XL_HardwareType.XL_HWTYPE_VIRTUAL
1106
        :param hw_index:
1107
            The index of the interface if multiple interface with the same
1108
            hardware type are present.
1109
        :param hw_channel:
1110
            The channel index of the interface.
1111

1112
        :raises can.interfaces.vector.VectorInitializationError:
1113
            If the application name does not exist in the Vector hardware configuration.
1114
        """
1115
        if xldriver is None:
21✔
UNCOV
1116
            raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
×
1117

1118
        xldriver.xlSetApplConfig(
21✔
1119
            app_name.encode(),
1120
            app_channel,
1121
            hw_type,
1122
            hw_index,
1123
            hw_channel,
1124
            xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
1125
        )
1126

1127
    def set_timer_rate(self, timer_rate_ms: int) -> None:
21✔
1128
        """Set the cyclic event rate of the port.
1129

1130
        Once set, the port will generate a cyclic event with the tag XL_EventTags.XL_TIMER.
1131
        This timer can be used to keep an application alive. See XL Driver Library Description
1132
        for more information
1133

1134
        :param timer_rate_ms:
1135
            The timer rate in ms. The minimal timer rate is 1ms, a value of 0 deactivates
1136
            the timer events.
1137
        """
1138
        timer_rate_10us = timer_rate_ms * 100
21✔
1139
        self.xldriver.xlSetTimerRate(self.port_handle, timer_rate_10us)
21✔
1140

1141

1142
class VectorCanParams(NamedTuple):
21✔
1143
    bitrate: int
21✔
1144
    sjw: int
21✔
1145
    tseg1: int
21✔
1146
    tseg2: int
21✔
1147
    sam: int
21✔
1148
    output_mode: xldefine.XL_OutputMode
21✔
1149
    can_op_mode: xldefine.XL_CANFD_BusParams_CanOpMode
21✔
1150

1151

1152
class VectorCanFdParams(NamedTuple):
21✔
1153
    bitrate: int
21✔
1154
    data_bitrate: int
21✔
1155
    sjw_abr: int
21✔
1156
    tseg1_abr: int
21✔
1157
    tseg2_abr: int
21✔
1158
    sam_abr: int
21✔
1159
    sjw_dbr: int
21✔
1160
    tseg1_dbr: int
21✔
1161
    tseg2_dbr: int
21✔
1162
    output_mode: xldefine.XL_OutputMode
21✔
1163
    can_op_mode: xldefine.XL_CANFD_BusParams_CanOpMode
21✔
1164

1165

1166
class VectorBusParams(NamedTuple):
21✔
1167
    bus_type: xldefine.XL_BusTypes
21✔
1168
    can: VectorCanParams
21✔
1169
    canfd: VectorCanFdParams
21✔
1170

1171

1172
class VectorChannelConfig(NamedTuple):
21✔
1173
    """NamedTuple which contains the channel properties from Vector XL API."""
21✔
1174

1175
    name: str
21✔
1176
    hw_type: Union[int, xldefine.XL_HardwareType]
21✔
1177
    hw_index: int
21✔
1178
    hw_channel: int
21✔
1179
    channel_index: int
21✔
1180
    channel_mask: int
21✔
1181
    channel_capabilities: xldefine.XL_ChannelCapabilities
21✔
1182
    channel_bus_capabilities: xldefine.XL_BusCapabilities
21✔
1183
    is_on_bus: bool
21✔
1184
    connected_bus_type: xldefine.XL_BusTypes
21✔
1185
    bus_params: Optional[VectorBusParams]
21✔
1186
    serial_number: int
21✔
1187
    article_number: int
21✔
1188
    transceiver_name: str
21✔
1189

1190

1191
def _get_xl_driver_config() -> xlclass.XLdriverConfig:
21✔
1192
    if xldriver is None:
21✔
1193
        raise VectorError(
21✔
1194
            error_code=xldefine.XL_Status.XL_ERR_DLL_NOT_FOUND,
1195
            error_string="xldriver is unavailable",
1196
            function="_get_xl_driver_config",
1197
        )
1198
    driver_config = xlclass.XLdriverConfig()
21✔
1199
    xldriver.xlOpenDriver()
21✔
1200
    xldriver.xlGetDriverConfig(driver_config)
21✔
1201
    xldriver.xlCloseDriver()
21✔
1202
    return driver_config
21✔
1203

1204

1205
def _read_bus_params_from_c_struct(
21✔
1206
    bus_params: xlclass.XLbusParams,
1207
) -> Optional[VectorBusParams]:
1208
    bus_type = xldefine.XL_BusTypes(bus_params.busType)
21✔
1209
    if bus_type is not xldefine.XL_BusTypes.XL_BUS_TYPE_CAN:
21✔
1210
        return None
21✔
1211
    return VectorBusParams(
21✔
1212
        bus_type=bus_type,
1213
        can=VectorCanParams(
1214
            bitrate=bus_params.data.can.bitRate,
1215
            sjw=bus_params.data.can.sjw,
1216
            tseg1=bus_params.data.can.tseg1,
1217
            tseg2=bus_params.data.can.tseg2,
1218
            sam=bus_params.data.can.sam,
1219
            output_mode=xldefine.XL_OutputMode(bus_params.data.can.outputMode),
1220
            can_op_mode=xldefine.XL_CANFD_BusParams_CanOpMode(
1221
                bus_params.data.can.canOpMode
1222
            ),
1223
        ),
1224
        canfd=VectorCanFdParams(
1225
            bitrate=bus_params.data.canFD.arbitrationBitRate,
1226
            data_bitrate=bus_params.data.canFD.dataBitRate,
1227
            sjw_abr=bus_params.data.canFD.sjwAbr,
1228
            tseg1_abr=bus_params.data.canFD.tseg1Abr,
1229
            tseg2_abr=bus_params.data.canFD.tseg2Abr,
1230
            sam_abr=bus_params.data.canFD.samAbr,
1231
            sjw_dbr=bus_params.data.canFD.sjwDbr,
1232
            tseg1_dbr=bus_params.data.canFD.tseg1Dbr,
1233
            tseg2_dbr=bus_params.data.canFD.tseg2Dbr,
1234
            output_mode=xldefine.XL_OutputMode(bus_params.data.canFD.outputMode),
1235
            can_op_mode=xldefine.XL_CANFD_BusParams_CanOpMode(
1236
                bus_params.data.canFD.canOpMode
1237
            ),
1238
        ),
1239
    )
1240

1241

1242
def get_channel_configs() -> list[VectorChannelConfig]:
21✔
1243
    """Read channel properties from Vector XL API."""
1244
    try:
21✔
1245
        driver_config = _get_xl_driver_config()
21✔
1246
    except VectorError:
21✔
1247
        return []
21✔
1248

1249
    channel_list: list[VectorChannelConfig] = []
21✔
1250
    for i in range(driver_config.channelCount):
21✔
1251
        xlcc: xlclass.XLchannelConfig = driver_config.channel[i]
21✔
1252
        vcc = VectorChannelConfig(
21✔
1253
            name=xlcc.name.decode(),
1254
            hw_type=_hw_type(xlcc.hwType),
1255
            hw_index=xlcc.hwIndex,
1256
            hw_channel=xlcc.hwChannel,
1257
            channel_index=xlcc.channelIndex,
1258
            channel_mask=xlcc.channelMask,
1259
            channel_capabilities=xldefine.XL_ChannelCapabilities(
1260
                xlcc.channelCapabilities
1261
            ),
1262
            channel_bus_capabilities=xldefine.XL_BusCapabilities(
1263
                xlcc.channelBusCapabilities
1264
            ),
1265
            is_on_bus=bool(xlcc.isOnBus),
1266
            bus_params=_read_bus_params_from_c_struct(xlcc.busParams),
1267
            connected_bus_type=xldefine.XL_BusTypes(xlcc.connectedBusType),
1268
            serial_number=xlcc.serialNumber,
1269
            article_number=xlcc.articleNumber,
1270
            transceiver_name=xlcc.transceiverName.decode(),
1271
        )
1272
        channel_list.append(vcc)
21✔
1273
    return channel_list
21✔
1274

1275

1276
def _hw_type(hw_type: int) -> Union[int, xldefine.XL_HardwareType]:
21✔
1277
    try:
21✔
1278
        return xldefine.XL_HardwareType(hw_type)
21✔
UNCOV
1279
    except ValueError:
×
UNCOV
1280
        LOG.warning(f'Unknown XL_HardwareType value "{hw_type}"')
×
UNCOV
1281
        return hw_type
×
1282

1283

1284
def _iterate_channel_index(channel_mask: int) -> Iterator[int]:
21✔
1285
    """Iterate over channel indexes in channel mask."""
1286
    for channel_index, bit in enumerate(reversed(bin(channel_mask)[2:])):
21✔
1287
        if bit == "1":
21✔
1288
            yield channel_index
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc