• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 14351131961

09 Apr 2025 07:08AM UTC coverage: 70.939% (+0.1%) from 70.799%
14351131961

Pull #1936

github

web-flow
Merge 095c205ea into 5d6239400
Pull Request #1936: support pcapng file format

187 of 195 new or added lines in 7 files covered. (95.9%)

5 existing lines in 1 file now uncovered.

7682 of 10829 relevant lines covered (70.94%)

15.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.8
/can/io/pcapng.py
1
"""
24✔
2
Contains handling of pcapng logging files.
3

4
pcapng file is a binary file format used for packet capture files.
5
Spec: https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-03.html
6
"""
7

8

9
import logging
24✔
10

11
from typing import Any, BinaryIO, Dict, Generator, Union
24✔
12

13
from ..message import Message
24✔
14
from ..typechecking import StringPathLike, Channel
24✔
15
from .generic import BinaryIOMessageWriter, BinaryIOMessageReader
24✔
16
from ..socketcan_common import build_can_frame, parse_can_frame, CAN_FRAME_HEADER_STRUCT_BE
24✔
17

18
logger = logging.getLogger("can.io.pcapng")
24✔
19

20
try:
24✔
21
    import pcapng
24✔
22
    from pcapng import blocks
24✔
NEW
23
except ImportError:
×
NEW
24
    pcapng = None
×
25

26

27
# https://www.tcpdump.org/linktypes.html
28
# https://www.tcpdump.org/linktypes/LINKTYPE_CAN_SOCKETCAN.html
29
LINKTYPE_CAN_SOCKETCAN = 227
24✔
30

31

32
class PcapngWriter(BinaryIOMessageWriter):
24✔
33
    """
24✔
34
    Logs CAN data to an pcapng file supported by Wireshark and other tools.
35
    """
36

37
    def __init__(
24✔
38
        self,
39
        file: Union[StringPathLike, BinaryIO],
40
        append: bool = False,
41
        tsresol: int = 9,
42
        **kwargs: Any,
43
    ) -> None:
44
        """
45
        :param file:
46
            A path-like object or as file-like object to write to.
47
            If this is a file-like object, is has to be opened in
48
            binary write mode, not text write mode.
49

50
        :param append:
51
            If True, the file will be opened in append mode. Otherwise,
52
            it will be opened in write mode. The default is False.
53

54
        :param tsresol:
55
            The time resolution of the timestamps in the pcapng file,
56
            expressed as -log10(unit in seconds),
57
            e.g. 9 for nanoseconds, 6 for microseconds.
58
            The default is 9, which corresponds to nanoseconds.
59
            .
60
        """
61
        if pcapng is None:
24✔
62
            raise NotImplementedError(
63
                "The python-pcapng package was not found. Install python-can with "
64
                "the optional dependency [pcapng] to use the PcapngWriter."
65
            )
66

67
        mode = "wb+"
24✔
68
        if append:
24✔
69
            mode = "ab+"
24✔
70

71
        # pcapng supports concatenation, and thus append
72
        super().__init__(file, mode=mode)
24✔
73
        self._header_block = blocks.SectionHeader(endianness=">")
24✔
74
        self._writer = pcapng.FileWriter(self.file, self._header_block)
24✔
75
        self._idbs: Dict[Channel, blocks.InterfaceDescription] = {}
24✔
76
        self.tsresol = tsresol
24✔
77

78
    def _resolve_idb(self, channel: Channel):
24✔
79
        channel_name = str(channel)
24✔
80
        if channel is None:
24✔
81
            channel_name = "can0"
24✔
82

83
        if channel_name not in self._idbs:
24✔
84
            idb = blocks.InterfaceDescription(
24✔
85
                section=self._header_block.section,
86
                link_type=LINKTYPE_CAN_SOCKETCAN,
87
                options={
88
                    "if_name": channel_name,
89
                    "if_tsresol": bytes([self.tsresol]), # nanoseconds
90
                },
91
                endianness=">" # big
92
            )
93
            self._header_block.register_interface(idb)
24✔
94
            self._writer.write_block(idb)
24✔
95
            self._idbs[channel_name] = idb
24✔
96

97
        return self._idbs[channel_name]
24✔
98

99
    def on_message_received(self, msg: Message) -> None:
24✔
100
        idb: blocks.InterfaceDescription = self._resolve_idb(msg.channel)
24✔
101
        timestamp_units = int(msg.timestamp * 10**self.tsresol)
24✔
102
        self._writer.write_block(blocks.EnhancedPacket(
24✔
103
            self._header_block.section,
104
            interface_id=idb.interface_id,
105
            packet_data=build_can_frame(msg, struct=CAN_FRAME_HEADER_STRUCT_BE),
106
            # timestamp (in tsresol units) = timestamp_high << 32 + timestamp_low
107
            timestamp_high=timestamp_units >> 32,
108
            timestamp_low=timestamp_units & 0xFFFFFFFF,
109
            endianness=">" # big
110
        ))
111

112
class PcapngReader(BinaryIOMessageReader):
24✔
113
    """
24✔
114
    Iterator of CAN messages from a Pcapng File.
115
    """
116

117
    file: BinaryIO
24✔
118

119
    def __init__(
24✔
120
        self,
121
        file: Union[StringPathLike, BinaryIO],
122
        **kwargs: Any,
123
    ) -> None:
124
        """
125
        :param file: a path-like object or as file-like object to read from
126
                     If this is a file-like object, is has to opened in binary
127
                     read mode, not text read mode.
128
        """
129

130
        if pcapng is None:
24✔
131
            raise NotImplementedError(
132
                "The python-pcapng package was not found. Install python-can with "
133
                "the optional dependency [pcapng] to use the PcapngReader."
134
            )
135
        
136
        super().__init__(file, mode="rb")
24✔
137
        self._scanner = pcapng.FileScanner(self.file)
24✔
138

139
    def __iter__(self) -> Generator[Message, None, None]:
24✔
140
        for block in self._scanner:
24✔
141
            if isinstance(block, blocks.EnhancedPacket):
24✔
142
                idn: blocks.InterfaceDescription = block.interface
24✔
143
                # We only care about the CAN packets
144
                if idn.link_type != LINKTYPE_CAN_SOCKETCAN:
24✔
NEW
145
                    logger.debug("Skipping non-CAN packet, link type: %s", idn.link_type)
×
NEW
146
                    continue
×
147

148
                msg = parse_can_frame(block.packet_data, struct=CAN_FRAME_HEADER_STRUCT_BE)
24✔
149

150
                timestamp64 = (block.timestamp_high << 32) + block.timestamp_low
24✔
151
                msg.timestamp = timestamp64 * idn.timestamp_resolution
24✔
152

153
                if 'if_name' in idn.options:
24✔
154
                    msg.channel = idn.options['if_name']
24✔
155
                else:
NEW
156
                    msg.channel = block.interface_id
×
157

158
                yield msg
24✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc