• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cogent3 / scinexus / 24923081665

25 Apr 2026 04:57AM UTC coverage: 99.752% (-0.04%) from 99.792%
24923081665

push

github

web-flow
Merge pull request #50 from GavinHuttley/main

new release

16 of 16 new or added lines in 2 files covered. (100.0%)

1 existing line in 1 file now uncovered.

2412 of 2418 relevant lines covered (99.75%)

5.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.62
/src/scinexus/sqlite_data_store.py
1
from __future__ import annotations
6✔
2

3
import contextlib
6✔
4
import datetime
6✔
5
import os
6✔
6
import re
6✔
7
import sqlite3
6✔
8
import weakref
6✔
9
from pathlib import Path
6✔
10
from typing import TYPE_CHECKING, Any
6✔
11

12
from scitrack import get_text_hexdigest  # type: ignore[import-untyped]
6✔
13

14
from scinexus.data_store import (
6✔
15
    APPEND,
16
    LOG_TABLE,
17
    OVERWRITE,
18
    READONLY,
19
    DataMember,
20
    DataMemberABC,
21
    DataStoreABC,
22
    DataStoreDirectory,
23
    Mode,
24
)
25
from scinexus.misc import extend_docstring_from
6✔
26

27
if TYPE_CHECKING:  # pragma: no cover
28
    from citeable import CitationBase
29

30
RESULT_TABLE = "results"
6✔
31
_MEMORY = ":memory:"
6✔
32
_mem_pattern = re.compile(r"^\s*[:]{0,1}memory[:]{0,1}\s*$")
6✔
33
NoneType = type(None)
6✔
34

35
# dealing with python3.12 deprecation of datetime objects and their sqlite3 handling
36

37

38
def _datetime_to_iso(timestamp: datetime.datetime) -> str:
6✔
39
    """timestamp in ISO 8601 format"""
40
    return timestamp.isoformat()
6✔
41

42

43
sqlite3.register_adapter(datetime.datetime, _datetime_to_iso)
6✔
44

45

46
def _datetime_from_iso(data: bytes) -> datetime.datetime:
6✔
47
    """timestamp from ISO 8601 format"""
UNCOV
48
    return datetime.datetime.fromisoformat(data.decode())
×
49

50

51
sqlite3.register_converter("timestamp", _datetime_from_iso)
6✔
52

53

54
# create db
55
def open_sqlite_db_rw(path: str | Path) -> sqlite3.Connection:
6✔
56
    """creates a new sqlitedb for read/write at path, can be an in-memory db
57

58
    Notes
59
    -----
60
    This function embeds the schema. There are three tables:
61

62
    - results: analysis objects, may be completed or not completed
63
    - logs: log-file contents
64
    - state: whether db is locked to a process
65

66
    Returns
67
    -------
68
    Handle to a sqlite3 session
69
    """
70
    db = sqlite3.connect(
6✔
71
        path,
72
        isolation_level=None,
73
        detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
74
    )
75
    db.row_factory = sqlite3.Row
6✔
76
    create_template = "CREATE TABLE IF NOT EXISTS {};"
6✔
77
    # note it is essential to use INTEGER for the autoincrement of primary key to work
78
    creates = [
6✔
79
        "state(state_id INTEGER PRIMARY KEY, record_type TEXT, lock_pid INTEGER)",
80
        f"{LOG_TABLE}(log_id INTEGER PRIMARY KEY, log_name TEXT, date timestamp, data BLOB)",
81
        f"{RESULT_TABLE}(record_id TEXT PRIMARY KEY, log_id INTEGER, md5 BLOB, is_completed INTEGER, data BLOB)",
82
        "citations(citation_id INTEGER PRIMARY KEY, data TEXT)",
83
    ]
84
    for table in creates:
6✔
85
        db.execute(create_template.format(table))
6✔
86
    return db
6✔
87

88

89
def open_sqlite_db_ro(path: str | Path) -> sqlite3.Connection:
6✔
90
    """returns db opened as read only
91
    Returns
92
    -------
93
    Handle to a sqlite3 session
94
    """
95
    db = sqlite3.connect(
6✔
96
        f"file:{path}?mode=ro",
97
        isolation_level=None,
98
        detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
99
        uri=True,
100
    )
101
    db.row_factory = sqlite3.Row
6✔
102
    if not has_valid_schema(db):
6✔
103
        msg = "database does not have a valid schema"
6✔
104
        raise ValueError(msg)
6✔
105
    return db
6✔
106

107

108
def has_valid_schema(db: sqlite3.Connection) -> bool:
6✔
109
    # TODO: should be a full schema check
110
    query = "SELECT name FROM sqlite_master WHERE type='table'"
6✔
111
    result = db.execute(query).fetchall()
6✔
112
    table_names = {r["name"] for r in result}
6✔
113
    _required = {RESULT_TABLE, LOG_TABLE, "state"}
6✔
114
    _optional = {"citations"}
6✔
115
    return _required <= table_names <= (_required | _optional)
6✔
116

117

118
class DataStoreSqlite(DataStoreABC):
6✔
119
    """data store backed by a SQLite database"""
120

121
    store_suffix = "sqlitedb"
6✔
122

123
    def __init__(
6✔
124
        self,
125
        source: str | Path,
126
        mode: Mode | str = READONLY,
127
        limit: int | None = None,
128
        verbose: bool = False,
129
    ) -> None:
130
        if _mem_pattern.search(str(source)):
6✔
131
            self._source: str | Path = _MEMORY
6✔
132
        else:
133
            source = Path(source).expanduser()
6✔
134
            self._source = (
6✔
135
                source
136
                if source.suffix[1:] == self.store_suffix  # sliced to remove "."
137
                else Path(f"{source}.{self.store_suffix}")
138
            )
139
        self._mode = Mode(mode)
6✔
140
        if mode is not READONLY and limit is not None:
6✔
141
            msg = "Using limit argument is only valid for readonly datastores"
6✔
142
            raise ValueError(
6✔
143
                msg,
144
            )
145
        self._limit = limit
6✔
146
        self._verbose = verbose
6✔
147
        self._db: sqlite3.Connection | None = None
6✔
148
        self._open = False
6✔
149
        self._log_id: int | None = None
6✔
150
        weakref.finalize(self, self.close)
6✔
151

152
    def __getstate__(self) -> dict[str, object]:
6✔
153
        return {**self._init_vals}
6✔
154

155
    def __setstate__(self, state: dict[str, Any]) -> None:
6✔
156
        # this will reset connections to read only db's
157
        obj = self.__class__(**state)
6✔
158
        self.__dict__.update(obj.__dict__)
6✔
159

160
    def __del__(self) -> None:
6✔
161
        """close the db connection when the object is deleted"""
162
        self.close()
6✔
163

164
    @property
6✔
165
    def source(self) -> str | Path:
6✔
166
        """string that references connecting to data store, override in subclass constructor"""
167
        return self._source
6✔
168

169
    @property
6✔
170
    def mode(self) -> Mode:
6✔
171
        """string that references datastore mode, override in override in subclass constructor"""
172
        return self._mode
6✔
173

174
    @property
6✔
175
    def limit(self) -> int | None:
6✔
176
        return self._limit
6✔
177

178
    @property
6✔
179
    def db(self) -> sqlite3.Connection:
6✔
180
        if self._db is None:
6✔
181
            db_func = open_sqlite_db_ro if self.mode is READONLY else open_sqlite_db_rw
6✔
182
            self._db = db_func(self.source)
6✔
183
            self._open = True
6✔
184
            self.lock()
6✔
185

186
        if self._db is None:
6✔
187
            msg = "database connection is unexpectedly None"
6✔
188
            raise ValueError(msg)
6✔
189
        return self._db
6✔
190

191
    def _init_log(self) -> None:
6✔
192
        timestamp = datetime.datetime.now(tz=datetime.UTC)
6✔
193
        self.db.execute(f"INSERT INTO {LOG_TABLE}(date) VALUES (?)", (timestamp,))
6✔
194
        self._log_id = self.db.execute(
6✔
195
            f"SELECT log_id FROM {LOG_TABLE} where date = ?",
196
            (timestamp,),
197
        ).fetchone()["log_id"]
198

199
    def close(self) -> None:
6✔
200
        """close the database connection"""
201
        db: sqlite3.Connection | None = getattr(self, "_db", None)
6✔
202
        if db is None:
6✔
203
            return
6✔
204
        with contextlib.suppress(sqlite3.ProgrammingError):
6✔
205
            db.close()
6✔
206
        self._open = False
6✔
207

208
    def read(self, unique_id: str) -> str | bytes:
6✔
209
        """
210
        identifier string formed from Path(table_name) / identifier
211
        """
212
        uid_path = Path(unique_id)
6✔
213
        table_name = str(uid_path.parent)
6✔
214
        if table_name not in (
6✔
215
            ".",
216
            LOG_TABLE,
217
        ):
218
            msg = f"unknown table for {str(uid_path)!r}"
6✔
219
            raise ValueError(msg)
6✔
220

221
        if table_name != LOG_TABLE:
6✔
222
            cmnd = f"SELECT * FROM {RESULT_TABLE} WHERE record_id = ?"
6✔
223
            result = self.db.execute(cmnd, (uid_path.name,)).fetchone()
6✔
224
            return result["data"]
6✔
225

226
        cmnd = f"SELECT * FROM {LOG_TABLE} WHERE log_name = ?"
6✔
227
        result = self.db.execute(cmnd, (uid_path.name,)).fetchone()
6✔
228

229
        return result["data"]
6✔
230

231
    @property
6✔
232
    def completed(self) -> list[DataMemberABC]:
6✔
233
        if not self._completed:
6✔
234
            self._completed = self._select_members(
6✔
235
                table_name=RESULT_TABLE,
236
                is_completed=True,
237
            )
238
        return self._completed
6✔
239

240
    @property
6✔
241
    def not_completed(self) -> list[DataMemberABC]:
6✔
242
        """returns database records of type NotCompleted"""
243
        if not self._not_completed:
6✔
244
            self._not_completed = self._select_members(
6✔
245
                table_name=RESULT_TABLE,
246
                is_completed=False,
247
            )
248
        return self._not_completed
6✔
249

250
    def _select_members(
6✔
251
        self,
252
        *,
253
        table_name: str,
254
        is_completed: bool,
255
    ) -> list[DataMemberABC]:
256
        limit = f"LIMIT {self.limit}" if self.limit else ""
6✔
257
        cmnd = self.db.execute(
6✔
258
            f"SELECT record_id FROM {table_name} WHERE is_completed=? {limit}",
259
            (is_completed,),
260
        )
261
        return [
6✔
262
            DataMember(data_store=self, unique_id=r["record_id"])
263
            for r in cmnd.fetchall()
264
        ]
265

266
    @property
6✔
267
    def logs(self) -> list[DataMemberABC]:
6✔
268
        """returns all log records"""
269
        cmnd = self.db.execute(f"SELECT log_name FROM {LOG_TABLE}")
6✔
270
        return [
6✔
271
            DataMember(data_store=self, unique_id=Path(LOG_TABLE) / r["log_name"])
272
            for r in cmnd.fetchall()
273
            if r["log_name"]
274
        ]
275

276
    def _write(
6✔
277
        self,
278
        *,
279
        table_name: str,
280
        unique_id: str,
281
        data: str | bytes,
282
        is_completed: bool,
283
    ) -> DataMemberABC | None:
284
        """
285
        Parameters
286
        ----------
287
        table_name
288
            name of table to save data. It must be _RESULT_TABLE or _LOG_TABLE.
289
        unique_id
290
            unique identifier that data will be saved under.
291
        data
292
            data to be saved.
293
        is_completed
294
            flag to identify NotCompleted results
295

296
        Returns
297
        -------
298
        DataMember instance or None when writing to _LOG_TABLE
299
        """
300
        if self._log_id is None:
6✔
301
            self._init_log()
6✔
302

303
        if table_name == LOG_TABLE:
6✔
304
            # TODO how to evaluate whether writing a new log?
305
            cmnd = f"UPDATE {table_name} SET data =?, log_name =? WHERE log_id=?"
6✔
306
            self.db.execute(cmnd, (data, unique_id, self._log_id))
6✔
307
            return None
6✔
308

309
        md5 = get_text_hexdigest(data)
6✔
310

311
        if unique_id in self and self.mode is not APPEND:
6✔
312
            cmnd = f"UPDATE {table_name} SET data= ?, log_id=?, md5=? WHERE record_id=?"
6✔
313
            self.db.execute(cmnd, (data, self._log_id, md5, unique_id))
6✔
314
        else:
315
            cmnd = f"INSERT INTO {table_name} (record_id,data,log_id,md5,is_completed) VALUES (?,?,?,?,?)"
6✔
316
            self.db.execute(cmnd, (unique_id, data, self._log_id, md5, is_completed))
6✔
317

318
        return DataMember(data_store=self, unique_id=unique_id)
6✔
319

320
    def drop_not_completed(self, *, unique_id: str | None = None) -> None:
6✔
321
        """remove not-completed records from the database
322

323
        Parameters
324
        ----------
325
        unique_id
326
            if provided, only drop the record with this identifier,
327
            otherwise drop all not-completed records
328
        """
329
        vals: tuple[int] | tuple[int, str]
330
        if not unique_id:
6✔
331
            cmnd = f"DELETE FROM {RESULT_TABLE} WHERE is_completed=?"
6✔
332
            vals = (0,)
6✔
333
        else:
334
            cmnd = f"DELETE FROM {RESULT_TABLE} WHERE is_completed=? AND record_id=?"
6✔
335
            vals = (0, unique_id)
6✔
336
        self.db.execute(cmnd, vals)
6✔
337
        self._not_completed = []
6✔
338

339
    @property
6✔
340
    def _lock_id(self) -> int | None:
6✔
341
        """returns lock_pid"""
342
        result = self.db.execute("SELECT lock_pid FROM state").fetchone()
6✔
343
        return result[0] if result else result
6✔
344

345
    @property
6✔
346
    def locked(self) -> bool:
6✔
347
        """returns if lock_pid is NULL or doesn't exist."""
348
        return self._lock_id is not None
6✔
349

350
    def lock(self) -> None:
6✔
351
        """if writable, and not locked, locks the database to this pid"""
352
        if self.mode is READONLY:
6✔
353
            return
6✔
354
        if self._db is None:
6✔
355
            msg = "database connection is unexpectedly None"
6✔
356
            raise RuntimeError(msg)
6✔
357
        result = self._db.execute("SELECT state_id,lock_pid FROM state").fetchall()
6✔
358
        locked = result[0]["lock_pid"] if result else None
6✔
359
        if locked and self.mode is OVERWRITE:
6✔
360
            msg = (
6✔
361
                f"You are trying to OVERWRITE {str(self.source)!r} which is "
362
                "locked. Use APPEND mode or unlock."
363
            )
364
            raise OSError(
6✔
365
                msg,
366
            )
367

368
        if result:
6✔
369
            # we will update an existing
370
            state_id = result[0]["state_id"]
6✔
371
            cmnd = "UPDATE state SET lock_pid=? WHERE state_id=?"
6✔
372
            vals = [os.getpid(), state_id]
6✔
373
        else:
374
            cmnd = "INSERT INTO state(lock_pid) VALUES (?)"
6✔
375
            vals = [os.getpid()]
6✔
376
        self._db.execute(cmnd, tuple(vals))
6✔
377

378
    def unlock(self, force: bool = False) -> None:
6✔
379
        """remove a lock if pid matches. If force, ignores pid. ignored if mode is READONLY"""
380
        if self.mode is READONLY:
6✔
381
            return
6✔
382

383
        lock_id = self._lock_id
6✔
384
        if lock_id is None:
6✔
385
            return
6✔
386

387
        if lock_id == os.getpid() or force:
6✔
388
            self.db.execute("UPDATE state SET lock_pid=NULL WHERE state_id=1")
6✔
389

390
        return
6✔
391

392
    @extend_docstring_from(DataStoreDirectory.write)
6✔
393
    def write(self, *, unique_id: str, data: str | bytes) -> DataMemberABC:  # type: ignore[override]
6✔
394
        if unique_id.startswith(RESULT_TABLE):
6✔
395
            unique_id = Path(unique_id).name
6✔
396

397
        super().write(unique_id=unique_id, data=data)
6✔
398

399
        self.drop_not_completed(unique_id=unique_id)
6✔
400

401
        member = self._write(
6✔
402
            table_name=RESULT_TABLE,
403
            unique_id=unique_id,
404
            data=data,
405
            is_completed=True,
406
        )
407
        if member is None:
6✔
408
            msg = "write to results table failed to produce a member"
6✔
409
            raise RuntimeError(msg)
6✔
410
        if member not in self._completed:
6✔
411
            self._completed.append(member)
6✔
412
        return member
6✔
413

414
    @extend_docstring_from(DataStoreDirectory.write_log)
6✔
415
    def write_log(self, *, unique_id: str, data: str | bytes) -> None:
6✔
416
        if unique_id.startswith(LOG_TABLE):
6✔
417
            unique_id = Path(unique_id).name
6✔
418

419
        super().write_log(unique_id=unique_id, data=data)
6✔
420
        _ = self._write(
6✔
421
            table_name=LOG_TABLE,
422
            unique_id=unique_id,
423
            data=data,
424
            is_completed=False,
425
        )
426

427
    @extend_docstring_from(DataStoreDirectory.write_not_completed)
6✔
428
    def write_not_completed(  # type: ignore[override]
6✔
429
        self, *, unique_id: str, data: str | bytes
430
    ) -> DataMemberABC:
431
        if unique_id.startswith(RESULT_TABLE):
6✔
432
            unique_id = Path(unique_id).name
6✔
433

434
        super().write_not_completed(unique_id=unique_id, data=data)
6✔
435
        member = self._write(
6✔
436
            table_name=RESULT_TABLE,
437
            unique_id=unique_id,
438
            data=data,
439
            is_completed=False,
440
        )
441
        if member is None:
6✔
442
            msg = "write to results table failed to produce a member"
6✔
443
            raise RuntimeError(msg)
6✔
444
        self._not_completed.append(member)
6✔
445
        return member
6✔
446

447
    def md5(self, unique_id: str) -> str | None:
6✔
448
        """
449
        Parameters
450
        ----------
451
        unique_id
452
            name of data store member
453
        Returns
454
        -------
455
        md5 checksum for the member, if available, None otherwise
456
        """
457
        cmnd = f"SELECT * FROM {RESULT_TABLE} WHERE record_id = ?"
6✔
458
        result = self.db.execute(cmnd, (unique_id,)).fetchone()
6✔
459

460
        return result["md5"] if result else None
6✔
461

462
    def write_citations(self, *, data: tuple[CitationBase, ...]) -> None:
6✔
463
        if not data:
6✔
464
            return
6✔
465
        if not self._has_citations_table():
6✔
466
            self.db.execute(
6✔
467
                "CREATE TABLE IF NOT EXISTS citations"
468
                "(citation_id INTEGER PRIMARY KEY, data TEXT)",
469
            )
470
        from citeable import to_jsons
6✔
471

472
        json_data = to_jsons(data)
6✔
473
        if existing := self.db.execute("SELECT citation_id FROM citations").fetchone():
6✔
474
            self.db.execute(
6✔
475
                "UPDATE citations SET data=? WHERE citation_id=?",
476
                (json_data, existing["citation_id"]),
477
            )
478
        else:
479
            self.db.execute("INSERT INTO citations(data) VALUES (?)", (json_data,))
6✔
480

481
    def _load_citations(self) -> list[CitationBase]:
6✔
482
        from citeable import from_jsons
6✔
483

484
        if not self._has_citations_table():
6✔
485
            return []
6✔
486
        result = self.db.execute("SELECT data FROM citations").fetchone()
6✔
487
        return from_jsons(result["data"]) if result else []
6✔
488

489
    def _has_citations_table(self) -> bool:
6✔
490
        result = self.db.execute(
6✔
491
            "SELECT name FROM sqlite_master WHERE type='table' AND name='citations'",
492
        ).fetchone()
493
        return result is not None
6✔
494

495
    def _describe(self) -> dict[str, object]:
6✔
496
        if self.locked and self._lock_id != os.getpid():
6✔
497
            title = f"Locked db store. Locked to pid={self._lock_id}, current pid={os.getpid()}."
6✔
498
        elif self.locked:
6✔
499
            title = "Locked to the current process."
6✔
500
        else:
501
            title = "Unlocked db store."
6✔
502
        result = super()._describe()
6✔
503
        result["title"] = title
6✔
504
        return result
6✔
505

506
    @property
6✔
507
    def record_type(self) -> str:
6✔
508
        """class name of completed results"""
509
        result = self.db.execute("SELECT record_type FROM state").fetchone()
6✔
510
        return result["record_type"]
6✔
511

512
    @record_type.setter
6✔
513
    def record_type(self, obj: object) -> None:
6✔
514
        from scinexus.misc import get_object_provenance
6✔
515

516
        rt = self.record_type
6✔
517
        if self.mode is OVERWRITE and rt:
6✔
518
            msg = f"cannot overwrite existing record_type {rt}"
6✔
519
            raise OSError(msg)
6✔
520

521
        n = get_object_provenance(obj)
6✔
522
        self.db.execute("UPDATE state SET record_type=? WHERE state_id=1", (n,))
6✔
523

524
    def _summary_not_completed(self) -> list[dict]:
6✔
525
        """returns a list of dicts summarising not completed results"""
526
        from scinexus.data_store import summary_not_completeds
6✔
527
        from scinexus.io import DEFAULT_DESERIALISER
6✔
528

529
        return summary_not_completeds(
6✔
530
            self.not_completed,
531
            deserialise=DEFAULT_DESERIALISER,
532
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc