• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 15474433755

05 Jun 2025 06:21PM UTC coverage: 70.972% (+0.07%) from 70.901%
15474433755

Pull #1949

github

web-flow
Merge e6599c908 into f43bedbc6
Pull Request #1949: Add public functions for creating bus command lines options

137 of 152 new or added lines in 4 files covered. (90.13%)

1 existing line in 1 file now uncovered.

7743 of 10910 relevant lines covered (70.97%)

13.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.84
/can/interfaces/seeedstudio/seeedstudio.py
1
"""
21✔
2
To Support the Seeed USB-Can analyzer interface. The device will appear
3
as a serial port, for example "/dev/ttyUSB0" on Linux machines
4
or "COM1" on Windows.
5
https://www.seeedstudio.com/USB-CAN-Analyzer-p-2888.html
6
SKU 114991193
7
"""
8

9
import io
21✔
10
import logging
21✔
11
import struct
21✔
12
from time import time
21✔
13

14
import can
21✔
15
from can import BusABC, CanProtocol, Message
21✔
16

17
logger = logging.getLogger("seeedbus")
21✔
18

19
try:
21✔
20
    import serial
21✔
21
except ImportError:
×
22
    logger.warning(
×
23
        "You won't be able to use the serial can backend without "
24
        "the serial module installed!"
25
    )
26
    serial = None
×
27

28

29
class SeeedBus(BusABC):
21✔
30
    """
21✔
31
    Enable basic can communication over a USB-CAN-Analyzer device.
32
    """
33

34
    BITRATE = {
21✔
35
        1000000: 0x01,
36
        800000: 0x02,
37
        500000: 0x03,
38
        400000: 0x04,
39
        250000: 0x05,
40
        200000: 0x06,
41
        125000: 0x07,
42
        100000: 0x08,
43
        50000: 0x09,
44
        20000: 0x0A,
45
        10000: 0x0B,
46
        5000: 0x0C,
47
    }
48

49
    FRAMETYPE = {"STD": 0x01, "EXT": 0x02}
21✔
50

51
    OPERATIONMODE = {
21✔
52
        "normal": 0x00,
53
        "loopback": 0x01,
54
        "silent": 0x02,
55
        "loopback_and_silent": 0x03,
56
    }
57

58
    def __init__(
21✔
59
        self,
60
        channel,
61
        baudrate=2000000,
62
        timeout=0.1,
63
        frame_type="STD",
64
        operation_mode="normal",
65
        bitrate=500000,
66
        **kwargs,
67
    ):
68
        """
69
        :param str channel:
70
            The serial device to open. For example "/dev/ttyS1" or
71
            "/dev/ttyUSB0" on Linux or "COM1" on Windows systems.
72

73
        :param baudrate:
74
            The default matches required baudrate
75

76
        :param float timeout:
77
            Timeout for the serial device in seconds (default 0.1).
78

79
        :param str frame_type:
80
            STD or EXT, to select standard or extended messages
81

82
        :param operation_mode
83
            normal, loopback, silent or loopback_and_silent.
84

85
        :param bitrate
86
            CAN bus bit rate, selected from available list.
87

88
        :raises can.CanInitializationError: If the given parameters are invalid.
89
        :raises can.CanInterfaceNotImplementedError: If the serial module is not installed.
90
        """
91

92
        if serial is None:
×
93
            raise can.CanInterfaceNotImplementedError(
×
94
                "the serial module is not installed"
95
            )
96

97
        self.bit_rate = bitrate
×
98
        self.frame_type = frame_type
×
99
        self.op_mode = operation_mode
×
100
        self.filter_id = bytearray([0x00, 0x00, 0x00, 0x00])
×
101
        self.mask_id = bytearray([0x00, 0x00, 0x00, 0x00])
×
102
        self._can_protocol = CanProtocol.CAN_20
×
103

104
        if not channel:
×
105
            raise can.CanInitializationError("Must specify a serial port.")
×
106

107
        self.channel_info = "Serial interface: " + channel
×
108
        try:
×
109
            self.ser = serial.Serial(
×
110
                channel, baudrate=baudrate, timeout=timeout, rtscts=False
111
            )
112
        except ValueError as error:
×
113
            raise can.CanInitializationError(
×
114
                "could not create the serial device"
115
            ) from error
116

117
        super().__init__(channel=channel, **kwargs)
×
118
        self.init_frame()
×
119

120
    def shutdown(self):
21✔
121
        """
122
        Close the serial interface.
123
        """
124
        super().shutdown()
21✔
125
        self.ser.close()
21✔
126

127
    def init_frame(self, timeout=None):
21✔
128
        """
129
        Send init message to setup the device for comms. this is called during
130
        interface creation.
131

132
        :param timeout:
133
            This parameter will be ignored. The timeout value of the channel is
134
            used instead.
135
        """
136
        byte_msg = bytearray()
×
137
        byte_msg.append(0xAA)  # Frame Start Byte 1
×
138
        byte_msg.append(0x55)  # Frame Start Byte 2
×
139
        byte_msg.append(0x12)  # Initialization Message ID
×
140
        byte_msg.append(SeeedBus.BITRATE[self.bit_rate])  # CAN Baud Rate
×
141
        byte_msg.append(SeeedBus.FRAMETYPE[self.frame_type])
×
142
        byte_msg.extend(self.filter_id)
×
143
        byte_msg.extend(self.mask_id)
×
144
        byte_msg.append(SeeedBus.OPERATIONMODE[self.op_mode])
×
145
        byte_msg.append(0x01)  # Follows 'Send once' in windows app.
×
146

147
        byte_msg.extend([0x00] * 4)  # Manual bitrate config, details unknown.
×
148

149
        crc = sum(byte_msg[2:]) & 0xFF
×
150
        byte_msg.append(crc)
×
151

152
        logger.debug("init_frm:\t%s", byte_msg.hex())
×
153
        try:
×
154
            self.ser.write(byte_msg)
×
155
        except Exception as error:
×
156
            raise can.CanInitializationError("could send init frame") from error
×
157

158
    def flush_buffer(self):
21✔
159
        self.ser.flushInput()
×
160

161
    def status_frame(self, timeout=None):
21✔
162
        """
163
        Send status request message over the serial device.  The device will
164
        respond but details of error codes are unknown but are logged - DEBUG.
165

166
        :param timeout:
167
            This parameter will be ignored. The timeout value of the channel is
168
            used instead.
169
        """
170
        byte_msg = bytearray()
×
171
        byte_msg.append(0xAA)  # Frame Start Byte 1
×
172
        byte_msg.append(0x55)  # Frame Start Byte 2
×
173
        byte_msg.append(0x04)  # Status Message ID
×
174
        byte_msg.append(0x00)  # In response packet - Rx error count
×
175
        byte_msg.append(0x00)  # In response packet - Tx error count
×
176

177
        byte_msg.extend([0x00] * 14)
×
178

179
        crc = sum(byte_msg[2:]) & 0xFF
×
180
        byte_msg.append(crc)
×
181

182
        logger.debug("status_frm:\t%s", byte_msg.hex())
×
183
        self._write(byte_msg)
×
184

185
    def send(self, msg, timeout=None):
21✔
186
        """
187
        Send a message over the serial device.
188

189
        :param can.Message msg:
190
            Message to send.
191

192
        :param timeout:
193
            This parameter will be ignored. The timeout value of the channel is
194
            used instead.
195
        """
196

197
        byte_msg = bytearray()
×
198
        byte_msg.append(0xAA)
×
199

200
        m_type = 0xC0
×
201
        if msg.is_extended_id:
×
202
            m_type += 1 << 5
×
203

204
        if msg.is_remote_frame:
×
205
            m_type += 1 << 4
×
206

207
        m_type += msg.dlc
×
208
        byte_msg.append(m_type)
×
209

210
        if msg.is_extended_id:
×
211
            a_id = struct.pack("<I", msg.arbitration_id)
×
212
        else:
213
            a_id = struct.pack("<H", msg.arbitration_id)
×
214

215
        byte_msg.extend(a_id)
×
216
        byte_msg.extend(msg.data)
×
217
        byte_msg.append(0x55)
×
218

219
        logger.debug("sending:\t%s", byte_msg.hex())
×
220
        self._write(byte_msg)
×
221

222
    def _write(self, byte_msg: bytearray) -> None:
21✔
223
        try:
×
224
            self.ser.write(byte_msg)
×
225
        except serial.PortNotOpenError as error:
×
226
            raise can.CanOperationError("writing to closed port") from error
×
227
        except serial.SerialTimeoutException as error:
×
228
            raise can.CanTimeoutError() from error
×
229

230
    def _recv_internal(self, timeout):
21✔
231
        """
232
        Read a message from the serial device.
233

234
        :param timeout:
235

236
            .. warning::
237
                This parameter will be ignored. The timeout value of the
238
                channel is used.
239

240
        :returns:
241
            Received message and False (because not filtering as taken place).
242

243
        :rtype:
244
            can.Message, bool
245
        """
246
        try:
×
247
            # ser.read can return an empty string
248
            # or raise a SerialException
249
            rx_byte_1 = self.ser.read()
×
250

251
        except serial.PortNotOpenError as error:
×
252
            raise can.CanOperationError("reading from closed port") from error
×
253
        except serial.SerialException:
×
254
            return None, False
×
255

256
        if rx_byte_1 and ord(rx_byte_1) == 0xAA:
×
257
            try:
×
258
                rx_byte_2 = ord(self.ser.read())
×
259

260
                time_stamp = time()
×
261
                if rx_byte_2 == 0x55:
×
262
                    status = bytearray([0xAA, 0x55])
×
263
                    status += bytearray(self.ser.read(18))
×
264
                    logger.debug("status resp:\t%s", status.hex())
×
265

266
                else:
267
                    length = int(rx_byte_2 & 0x0F)
×
268
                    is_extended = bool(rx_byte_2 & 0x20)
×
269
                    is_remote = bool(rx_byte_2 & 0x10)
×
270
                    if is_extended:
×
271
                        s_3_4_5_6 = bytearray(self.ser.read(4))
×
272
                        arb_id = (struct.unpack("<I", s_3_4_5_6))[0]
×
273

274
                    else:
275
                        s_3_4 = bytearray(self.ser.read(2))
×
276
                        arb_id = (struct.unpack("<H", s_3_4))[0]
×
277

278
                    data = bytearray(self.ser.read(length))
×
279
                    end_packet = ord(self.ser.read())
×
280
                    if end_packet == 0x55:
×
281
                        msg = Message(
×
282
                            timestamp=time_stamp,
283
                            arbitration_id=arb_id,
284
                            is_extended_id=is_extended,
285
                            is_remote_frame=is_remote,
286
                            dlc=length,
287
                            data=data,
288
                        )
289
                        logger.debug("recv message: %s", str(msg))
×
290
                        return msg, False
×
291

292
                    else:
293
                        return None, False
×
294

295
            except serial.PortNotOpenError as error:
×
296
                raise can.CanOperationError("reading from closed port") from error
×
297
            except serial.SerialException as error:
×
298
                raise can.CanOperationError(
×
299
                    "failed to read message information"
300
                ) from error
301

302
        return None, None
×
303

304
    def fileno(self):
21✔
305
        try:
×
306
            return self.ser.fileno()
×
307
        except io.UnsupportedOperation as excption:
×
308
            logger.warning(
×
309
                "fileno is not implemented using current CAN bus: %s", str(excption)
310
            )
311
            return -1
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc