• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cnobile2012 / CheckMeIn / 17752043276

16 Sep 2025 02:00AM UTC coverage: 94.853% (-2.1%) from 96.909%
17752043276

push

github

cnobile2012
Added tests for manage.py and changed how coverage works.

60 of 72 new or added lines in 4 files covered. (83.33%)

2285 of 2409 relevant lines covered (94.85%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.9
/src/base_database.py
1
# -*- coding: utf-8 -*-
2
#
3
# src/base_database.py
4
#
5

6
import os
1✔
7
import datetime
1✔
8
import sqlite3
1✔
9
import aiosqlite
1✔
10

11
from configparser import ConfigParser
1✔
12

13
from . import Borg, BASE_DIR, AppConfig
1✔
14

15

16
def adapt_datetime(dt):
1✔
17
    """
18
    Adapter: datetime → ISO string
19
    """
20
    return dt.isoformat()
1✔
21

22

23
def custom_converter(value):
1✔
24
    """
25
    Converter: ISO string → datetime
26
    """
27
    return datetime.datetime.fromisoformat(value.decode("utf-8"))
1✔
28

29

30
aiosqlite.register_adapter(datetime.datetime, adapt_datetime)
1✔
31
aiosqlite.register_converter('DATETIME', custom_converter)
1✔
32

33

34
class BaseDatabase(Borg):
1✔
35
    """
36
    The base class for all DB operations.
37

38
    https://sqlite.org/
39
    https://www.w3schools.com/sql/
40
    """
41
    _T_ACCOUNTS = 'accounts'
1✔
42
    _T_CERTIFICATIONS = 'certifications'
1✔
43
    _T_CONFIG = 'config'
1✔
44
    _T_DEVICES = 'devices'
1✔
45
    _T_GUESTS = 'guests'
1✔
46
    _T_LOG_EVENTS = 'log_events'
1✔
47
    _T_MEMBERS = 'members'
1✔
48
    _T_REPORTS = 'reports'
1✔
49
    _T_RESTRICTIONS = 'restrictions'
1✔
50
    _T_TEAM_MEMBERS = 'team_members'
1✔
51
    _T_TEAMS = 'teams'
1✔
52
    _T_TOOLS = 'tools'
1✔
53
    _T_UNLOCKS = 'unlocks'
1✔
54
    _T_VISITS = 'visits'
1✔
55
    _V_CURRENT_MEMBERS = 'current_members'
1✔
56
    _SCHEMA = {
1✔
57
        _T_ACCOUNTS: (
58
            'user TEXT NOT NULL PRIMARY KEY COLLATE NOCASE',
59
            'password TEXT NOT NULL',
60
            'forgot TEXT',  # Hashed token used in URL.
61
            'forgotTime DATETIME',
62
            'barcode TEXT UNIQUE',
63
            'activeKeyholder INTEGER default 0',
64
            'role INTEGER default 0'),
65
        _T_CERTIFICATIONS: (
66
            'user_id TEXT',
67
            'tool_id INTEGER',
68
            'certifier_id TEXT',
69
            'date DATETIME',
70
            'level INTEGER default 0'),
71
        _T_CONFIG: (
72
            'key TEXT NOT NULL PRIMARY KEY',
73
            'value TEXT'),
74
        _T_DEVICES: (
75
            'mac TEXT PRIMARY KEY',
76
            'barcode TEXT',
77
            'name TEXT'),
78
        _T_GUESTS: (
79
            'guest_id TEXT UNIQUE',
80
            'displayName TEXT',
81
            'email TEXT',
82
            'firstName TEXT',
83
            'lastName TEXT',
84
            'whereFound TEXT',
85
            'status INTEGER default 1',
86
            'newsletter INTEGER default 0'),
87
        _T_LOG_EVENTS: (
88
            'what TEXT',
89
            'date DATETIME',
90
            'barcode TEXT'),
91
        _T_MEMBERS: (
92
            'barcode TEXT UNIQUE',
93
            'displayName TEXT',
94
            'firstName TEXT',
95
            'lastName TEXT',
96
            'email TEXT',
97
            'membershipExpires DATETIME'),
98
        _T_REPORTS: (
99
            'report_id INTEGER NOT NULL PRIMARY KEY',
100
            'name TEXT UNIQUE',
101
            'sql_text TEXT',
102
            'parameters TEXT',
103
            'active INTEGER default 1'),
104
        _T_RESTRICTIONS: (
105
            'id INTEGER PRIMARY KEY',
106
            'descr TEXT'),
107
        _T_TEAM_MEMBERS: (
108
            'team_id INTEGER',
109
            'barcode TEXT',
110
            'type INTEGER default 0',
111
            'CONSTRAINT unq UNIQUE (team_id, barcode)'),
112
        _T_TEAMS: (
113
            'team_id INTEGER NOT NULL PRIMARY KEY',
114
            'program_name TEXT NOT NULL',
115
            'program_number INTEGER NOT NULL',
116
            'team_name TEXT',
117
            'start_date DATETIME',
118
            'active INTEGER NOT NULL DEFAULT 1 CHECK (active IN (0, 1))',
119
            'CONSTRAINT unq UNIQUE (program_name, program_number, start_date)'
120
            ),
121
        _T_TOOLS: (
122
            'pk INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT',
123
            'name TEXT NOT NULL UNIQUE',
124
            'restriction INTEGER DEFAULT 0',
125
            'comment TEXT'),
126
        _T_UNLOCKS: (
127
            'time DATETIME',
128
            'location TEXT',
129
            'barcode TEXT'),
130
        _T_VISITS: (
131
            'enter_time DATETIME',
132
            'exit_time DATETIME',
133
            'barcode TEXT',
134
            'status TEXT'),
135
        _V_CURRENT_MEMBERS: (
136
            'barcode',
137
            'displayName',
138
            'membershipExpires',
139
            ),
140
        }
141
    _SCHEMA_EXTRA = {
1✔
142
        _V_CURRENT_MEMBERS: (
143
            'AS SELECT m.barcode, m.displayName, m.membershipExpires '
144
            "FROM members m WHERE m.membershipExpires > date('now', '-' || ("
145
            "SELECT value FROM config WHERE key = 'grace_period') || ' days')"
146
            ),
147
        }
148
    _SCHEMA_INDEXES = (
1✔
149
        ('CREATE INDEX IF NOT EXISTS idx_visits_barcode '
150
         'ON visits(barcode);'),
151
        )
152
    _DETECT_TYPES = sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
1✔
153
    _log = AppConfig().log
1✔
154

155
    def __init__(self, *args, **kwargs):
1✔
156
        super().__init__(*args, **kwargs)
1✔
157
        self._db_fullpath = ''
1✔
158
        self._TABLES = [getattr(self, var) for var in dir(self)
1✔
159
                        if var.startswith('_T_')]
160
        self._VIEWS = [getattr(self, var) for var in dir(self)
1✔
161
                       if var.startswith('_V_')]
162

163
    @classmethod
1✔
164
    def read_config(cls, fullpath: str, section_key: dict):
1✔
165
        """
166
        Read the CherryPi config files. These are INI files but with a .conf
167
        extention.
168

169
        :param str fullpath: Path to the config file.
170
        :param dict section_key: A dictionary od section and keys in the
171
                                 following form:
172
                                 {<section0>: (<key0>, <key1>, ...),
173
                                  <section1>: (<key0>, <key1>, ...)}
174
        :returns: A dictionary in the following form:
175
                  {<section0>: {<key0>: <value0>, <key1>: <value1>, ...),
176
                   <section1>: {<key0>: <value0>, <key1>: <value1>, ...)}
177
        """
178
        items = {}
1✔
179
        config = ConfigParser()
1✔
180
        result = config.read(fullpath)
1✔
181

182
        if result == []:
1✔
183
            cls._log.error("An invalid config file or path, found %s",
×
184
                           fullpath)
185

186
        for section, keys in section_key.items():
1✔
187
            for key in keys:
1✔
188
                try:
1✔
189
                    value = eval(config[section][key])
1✔
NEW
190
                except KeyError:
×
NEW
191
                    cls._log.error("Invalid section and/or key, section: "
×
192
                                   "%s, key: %s", section, key)
193
                else:
194
                    values = items.setdefault(section, {})
1✔
195
                    values[key] = value
1✔
196

197
        return items
1✔
198

199
    @property
1✔
200
    def db_fullpath(self) -> str:
1✔
201
        """
202
        Get the database full path.
203
        """
204
        assert self._db_fullpath, ("The database full path must be set "
1✔
205
                                   "before using this property.")
206
        return self._db_fullpath
1✔
207

208
    @db_fullpath.setter
1✔
209
    def db_fullpath(self, path_info: tuple) -> None:
1✔
210
        """
211
        Set the fullpath of the database file. If needed, create directories.
212

213
        .. note::
214

215
           1. The path element must be a path from the base of the project.
216
              The path will be ignored if filename is ':memory:'.
217
           2. The filename element is the name of the sqlite3 database file.
218
              If filename is ':memory:' the path may be an empty string.
219
           3. If 'True' for production or development and 'False' for testing.
220

221
        :param tuple path_info: Path, filename, and a boolean.
222
        """
223
        assert isinstance(path_info, (tuple, list)) and len(path_info) == 3, (
1✔
224
            "Argument must be a tuple or list of three elements "
225
            "(path, name, boolean).")
226
        path, filename, prod_or_dev = path_info
1✔
227

228
        if filename == ':memory:':
1✔
229
            fullpath = filename
×
230
        else:
231
            fullpath = os.path.join(BASE_DIR, path)
1✔
232

233
            if not prod_or_dev and not os.path.exists(path):
1✔
234
                os.mkdir(fullpath, mode=0o775)
×
235

236
            fullpath = os.path.join(fullpath, filename)
1✔
237

238
        self._db_fullpath = fullpath
1✔
239

240
    @property
1✔
241
    async def has_schema(self) -> bool:
1✔
242
        """
243
        Checks that the schema has been created.
244
        """
245
        query = "SELECT name FROM sqlite_master;"
1✔
246
        table_names = [table[0]
1✔
247
                       for table in await self._do_select_all_query(query)
248
                       if not table[0].startswith('sqlite_')]
249
        table_names.sort()
1✔
250
        tables_views = self._TABLES + self._VIEWS
1✔
251
        tables_views.sort()
1✔
252
        check = table_names == tables_views
1✔
253

254
        if not check:
1✔
255
            msg = ("Database table count or names are wrong it should be "
1✔
256
                   f"{table_names} found {tables_views}")
257
            self._log.error(msg)
1✔
258

259
        return check
1✔
260

261
    async def create_schema(self) -> None:
1✔
262
        """
263
        Create the database schema, tables and views.
264
        """
265
        try:
1✔
266
            status = os.stat(self.db_fullpath)
1✔
267
        except FileNotFoundError:
×
268
            exists = False
×
269
        else:
270
            exists = False if status.st_size == 0 else True
1✔
271

272
        if not exists or not await self.has_schema:
1✔
273
            tables = {table: params for table, params in self._SCHEMA.items()
1✔
274
                      if table not in self._VIEWS}
275
            views = {view: params for view, params in self._SCHEMA.items()
1✔
276
                     if view in self._VIEWS}
277

278
            async with aiosqlite.connect(self.db_fullpath) as db:
1✔
279
                for table, params in tables.items():
1✔
280
                    fields = ', '.join([field for field in params])
1✔
281
                    query = f"CREATE TABLE IF NOT EXISTS {table} ({fields})"
1✔
282
                    extra = self._SCHEMA_EXTRA.get(table)
1✔
283
                    query += f' {extra};' if extra else ';'
1✔
284
                    self._log.info("Created table: %s", query)
1✔
285
                    await db.execute(query)
1✔
286
                    await db.commit()
1✔
287

288
                for query in self._SCHEMA_INDEXES:
1✔
289
                    self._log.info("Created index: %s", query)
1✔
290
                    await db.execute(query)
1✔
291
                    await db.commit()
1✔
292

293
                for view, params in views.items():
1✔
294
                    fields = ', '.join([field for field in params])
1✔
295
                    query = f"CREATE VIEW IF NOT EXISTS {view} ({fields})"
1✔
296
                    extra = self._SCHEMA_EXTRA.get(view)
1✔
297
                    query += f' {extra};' if extra else ';'
1✔
298
                    self._log.info("Created view: %s", query)
1✔
299
                    await db.execute(query)
1✔
300
                    await db.commit()
1✔
301

302
    async def _do_select_all_query(self, query: str, params: tuple=()) -> list:
1✔
303
        """
304
        Do the actual query and return the results.
305

306
        :param str query: The SQL query to execute.
307
        :params tuple params: Parameters to query.
308
        :returns: A list of the data.
309
        :rtype: list
310
        """
311
        async with aiosqlite.connect(self.db_fullpath,
1✔
312
                                     detect_types=self._DETECT_TYPES) as db:
313
            async with db.execute(query, params) as cursor:
1✔
314
                values = await cursor.fetchall()
1✔
315

316
        return values
1✔
317

318
    async def _do_select_one_query(self, query: str, params: tuple=()):
1✔
319
        """
320
        Do the actual query and return the results.
321

322
        :param str query: The SQL query to execute.
323
        :params tuple or dict params: Parameters to query.
324
        :returns: One item of data.
325
        :rtype: tuple or NoneType
326
        """
327
        async with aiosqlite.connect(self.db_fullpath,
1✔
328
                                     detect_types=self._DETECT_TYPES) as db:
329
            async with db.execute(query, params) as cursor:
1✔
330
                value = await cursor.fetchone()
1✔
331

332
        return value
1✔
333

334
    async def _do_select_read_only(self, query, params: tuple=(),
1✔
335
                                   fetchone=False) -> tuple:
336
        path = f"file:{self.db_fullpath}?mode=ro"
1✔
337

338
        async with aiosqlite.connect(path, detect_types=self._DETECT_TYPES,
1✔
339
                                     uri=True) as db:
340
            async with db.execute(query, params) as cursor:
1✔
341
                description = cursor.description
1✔
342

343
                if fetchone:
1✔
344
                    data = await cursor.fetchone()
1✔
345
                else:
346
                    data = await cursor.fetchall()
1✔
347

348
        return data, description
1✔
349

350
    async def _do_insert_query(self, query: str, data: list) -> int:
1✔
351
        """
352
        Do the insert query.
353

354
        :param str query: The SQL query to execute.
355
        :param list or tuple data: Data to insert into the table.
356
        :returns: Number of rows affected by the query or 'None' of an
357
                  exception was raised.
358
        :rtype: int or None
359
        """
360
        return await self._do_query(query, data)
1✔
361

362
    async def _do_update_query(self, query: str, data: list) -> int:
1✔
363
        """
364
        Do the update query.
365

366
        :param str query: The SQL query to do.
367
        :param list or tuple data: Data to update into the table.
368
        :returns: Number of rows affected by the query or 'None' of an
369
                  exception was raised.
370
        :rtype: int or None
371
        """
372
        return await self._do_query(query, data)
1✔
373

374
    async def _do_delete_query(self, query: str, data: list) -> int:
1✔
375
        """
376
        Do the delete query.
377

378
        :param str query: The SQL query to do.
379
        :param list or tuple data: Data used to delete items from a table.
380
        :returns: Number of rows affected by the query or 'None' of an
381
                  exception was raised.
382
        :rtype: int or None
383
        """
384
        return await self._do_query(query, data)
1✔
385

386
    async def _do_query(self, query: str, data: list) -> int:
1✔
387
        """
388
        Do the INSERT, UPDATE, or DELETE queries.
389

390
        :param str query: The SQL query to do.
391
        :param list or tuple data: Data used to insert, update, or delete
392
                                   items from a table.
393
        :returns: Number of rows affected by the query or 'None' of an
394
                  exception was raised.
395
        :rtype: int or None
396
        """
397
        assert ';' in query, "The query {query} does not end with a ';'"
1✔
398

399
        # Normalize: single row -> list of one row
400
        if data and (isinstance(data, dict) or
1✔
401
                     not isinstance(data, (list, tuple)) or
402
                     (isinstance(data, (list, tuple)) and data and
403
                      not isinstance(data[0], (list, tuple, dict)))):
404
            data = [data]
1✔
405

406
        async with aiosqlite.connect(self.db_fullpath,
1✔
407
                                     detect_types=self._DETECT_TYPES) as db:
408
            rowcount = 0
1✔
409
            queries = [q.strip() for q in query.split(";") if q.strip()]
1✔
410

411
            try:
1✔
412
                if len(queries) > 1:
1✔
413
                    await db.execute("BEGIN;")
1✔
414

415
                for stmt in queries:
1✔
416
                    if not stmt:
1✔
417
                        continue
×
418

419
                    cursor = await db.executemany(stmt, data)
1✔
420
                    rowcount += cursor.rowcount
1✔
421

422
                await db.commit()
1✔
423
            except Exception as e:
1✔
424
                await db.rollback()
1✔
425
                self._log.error("Error with data %s, %s", data, e,
1✔
426
                                exc_info=True)
427
                rowcount = 0
1✔
428

429
        return rowcount
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc