• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.86
/can/io/sqlite.py
1
"""
21✔
2
Implements an SQL database writer and reader for storing CAN messages.
3

4
.. note:: The database schema is given in the documentation of the loggers.
5
"""
6

7
import logging
21✔
8
import sqlite3
21✔
9
import threading
21✔
10
import time
21✔
11
from collections.abc import Generator
21✔
12
from typing import Any
21✔
13

14
from can.listener import BufferedReader
21✔
15
from can.message import Message
21✔
16

17
from ..typechecking import StringPathLike
21✔
18
from .generic import MessageReader, MessageWriter
21✔
19

20
log = logging.getLogger("can.io.sqlite")
21✔
21

22

23
class SqliteReader(MessageReader):
21✔
24
    """
21✔
25
    Reads recorded CAN messages from a simple SQL database.
26

27
    This class can be iterated over or used to fetch all messages in the
28
    database with :meth:`~SqliteReader.read_all`.
29

30
    Calling :func:`len` on this object might not run in constant time.
31

32
    :attr str table_name: the name of the database table used for storing the messages
33

34
    .. note:: The database schema is given in the documentation of the loggers.
35
    """
36

37
    def __init__(
21✔
38
        self,
39
        file: StringPathLike,
40
        table_name: str = "messages",
41
        **kwargs: Any,
42
    ) -> None:
43
        """
44
        :param file: a `str`  path like object that points
45
                     to the database file to use
46
        :param str table_name: the name of the table to look for the messages
47

48
        .. warning:: In contrary to all other readers/writers the Sqlite handlers
49
                     do not accept file-like objects as the `file` parameter.
50
                     It also runs in ``append=True`` mode all the time.
51
        """
52
        super().__init__(file=None)
21✔
53
        self._conn = sqlite3.connect(file)
21✔
54
        self._cursor = self._conn.cursor()
21✔
55
        self.table_name = table_name
21✔
56

57
    def __iter__(self) -> Generator[Message, None, None]:
21✔
58
        for frame_data in self._cursor.execute(f"SELECT * FROM {self.table_name}"):
21✔
59
            yield SqliteReader._assemble_message(frame_data)
21✔
60

61
    @staticmethod
21✔
62
    def _assemble_message(frame_data):
21✔
63
        timestamp, can_id, is_extended, is_remote, is_error, dlc, data = frame_data
21✔
64
        return Message(
21✔
65
            timestamp=timestamp,
66
            is_remote_frame=bool(is_remote),
67
            is_extended_id=bool(is_extended),
68
            is_error_frame=bool(is_error),
69
            arbitration_id=can_id,
70
            dlc=dlc,
71
            data=data,
72
        )
73

74
    def __len__(self):
21✔
75
        # this might not run in constant time
76
        result = self._cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}")
21✔
77
        return int(result.fetchone()[0])
21✔
78

79
    def read_all(self):
21✔
80
        """Fetches all messages in the database.
81

82
        :rtype: Generator[can.Message]
83
        """
84
        result = self._cursor.execute(f"SELECT * FROM {self.table_name}").fetchall()
21✔
85
        return (SqliteReader._assemble_message(frame) for frame in result)
21✔
86

87
    def stop(self):
21✔
88
        """Closes the connection to the database."""
89
        super().stop()
21✔
90
        self._conn.close()
21✔
91

92

93
class SqliteWriter(MessageWriter, BufferedReader):
21✔
94
    """Logs received CAN data to a simple SQL database.
21✔
95

96
    The sqlite database may already exist, otherwise it will
97
    be created when the first message arrives.
98

99
    Messages are internally buffered and written to the SQL file in a background
100
    thread. Ensures that all messages that are added before calling :meth:`~can.SqliteWriter.stop()`
101
    are actually written to the database after that call returns. Thus, calling
102
    :meth:`~can.SqliteWriter.stop()` may take a while.
103

104
    :attr str table_name: the name of the database table used for storing the messages
105
    :attr int num_frames: the number of frames actually written to the database, this
106
                          excludes messages that are still buffered
107
    :attr float last_write: the last time a message war actually written to the database,
108
                            as given by ``time.time()``
109

110
    .. note::
111

112
        When the listener's :meth:`~SqliteWriter.stop` method is called the
113
        thread writing to the database will continue to receive and internally
114
        buffer messages if they continue to arrive before the
115
        :attr:`~SqliteWriter.GET_MESSAGE_TIMEOUT`.
116

117
        If the :attr:`~SqliteWriter.GET_MESSAGE_TIMEOUT` expires before a message
118
        is received, the internal buffer is written out to the database file.
119

120
        However if the bus is still saturated with messages, the Listener
121
        will continue receiving until the :attr:`~can.SqliteWriter.MAX_TIME_BETWEEN_WRITES`
122
        timeout is reached or more than
123
        :attr:`~can.SqliteWriter.MAX_BUFFER_SIZE_BEFORE_WRITES` messages are buffered.
124

125
    .. note:: The database schema is given in the documentation of the loggers.
126

127
    """
128

129
    GET_MESSAGE_TIMEOUT = 0.25
21✔
130
    """Number of seconds to wait for messages from internal queue"""
15✔
131

132
    MAX_TIME_BETWEEN_WRITES = 5.0
21✔
133
    """Maximum number of seconds to wait between writes to the database"""
15✔
134

135
    MAX_BUFFER_SIZE_BEFORE_WRITES = 500
21✔
136
    """Maximum number of messages to buffer before writing to the database"""
15✔
137

138
    def __init__(
21✔
139
        self,
140
        file: StringPathLike,
141
        table_name: str = "messages",
142
        **kwargs: Any,
143
    ) -> None:
144
        """
145
        :param file: a `str` or path like object that points
146
                     to the database file to use
147
        :param str table_name: the name of the table to store messages in
148

149
        .. warning:: In contrary to all other readers/writers the Sqlite handlers
150
                     do not accept file-like objects as the `file` parameter.
151
        """
152
        if kwargs.get("append", False):
21✔
153
            raise ValueError(
21✔
154
                f"The append argument should not be used in "
155
                f"conjunction with the {self.__class__.__name__}."
156
            )
157
        super().__init__(file=None)
21✔
158
        self.table_name = table_name
21✔
159
        self._db_filename = file
21✔
160
        self._stop_running_event = threading.Event()
21✔
161
        self._conn = None
21✔
162
        self._writer_thread = threading.Thread(target=self._db_writer_thread)
21✔
163
        self._writer_thread.start()
21✔
164
        self.num_frames = 0
21✔
165
        self.last_write = time.time()
21✔
166
        self._insert_template = (
21✔
167
            f"INSERT INTO {self.table_name} VALUES (?, ?, ?, ?, ?, ?, ?)"
168
        )
169

170
    def _create_db(self):
21✔
171
        """Creates a new databae or opens a connection to an existing one.
172

173
        .. note::
174
            You can't share sqlite3 connections between threads (by default)
175
            hence we setup the db here. It has the upside of running async.
176
        """
177
        log.debug("Creating sqlite database")
21✔
178
        self._conn = sqlite3.connect(self._db_filename)
21✔
179

180
        # create table structure
181
        self._conn.cursor().execute(
21✔
182
            f"""CREATE TABLE IF NOT EXISTS {self.table_name}
183
            (
184
              ts REAL,
185
              arbitration_id INTEGER,
186
              extended INTEGER,
187
              remote INTEGER,
188
              error INTEGER,
189
              dlc INTEGER,
190
              data BLOB
191
            )"""
192
        )
193
        self._conn.commit()
21✔
194

195
    def _db_writer_thread(self):
21✔
196
        self._create_db()
21✔
197

198
        try:
21✔
199
            while True:
15✔
200
                messages = []  # reset buffer
21✔
201

202
                msg = self.get_message(self.GET_MESSAGE_TIMEOUT)
21✔
203
                while msg is not None:
21✔
204
                    # log.debug("SqliteWriter: buffering message")
205

206
                    messages.append(
21✔
207
                        (
208
                            msg.timestamp,
209
                            msg.arbitration_id,
210
                            msg.is_extended_id,
211
                            msg.is_remote_frame,
212
                            msg.is_error_frame,
213
                            msg.dlc,
214
                            memoryview(msg.data),
215
                        )
216
                    )
217

218
                    if (
21✔
219
                        time.time() - self.last_write > self.MAX_TIME_BETWEEN_WRITES
220
                        or len(messages) > self.MAX_BUFFER_SIZE_BEFORE_WRITES
221
                    ):
UNCOV
222
                        break
×
223

224
                    # just go on
225
                    msg = self.get_message(self.GET_MESSAGE_TIMEOUT)
21✔
226

227
                count = len(messages)
21✔
228
                if count > 0:
21✔
229
                    with self._conn:
21✔
230
                        # log.debug("Writing %d frames to db", count)
231
                        self._conn.executemany(self._insert_template, messages)
21✔
232
                        self._conn.commit()  # make the changes visible to the entire database
21✔
233
                    self.num_frames += count
21✔
234
                    self.last_write = time.time()
21✔
235

236
                # check if we are still supposed to run and go back up if yes
237
                if self._stop_running_event.is_set():
21✔
238
                    break
21✔
239

240
        finally:
241
            self._conn.close()
21✔
242
            log.info("Stopped sqlite writer after writing %d messages", self.num_frames)
21✔
243

244
    def stop(self):
21✔
245
        """Stops the reader an writes all remaining messages to the database. Thus, this
246
        might take a while and block.
247
        """
248
        BufferedReader.stop(self)
21✔
249
        self._stop_running_event.set()
21✔
250
        self._writer_thread.join()
21✔
251
        MessageReader.stop(self)
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc