• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cogent3 / cogent3 / 24544474967

17 Apr 2026 02:23AM UTC coverage: 90.01% (-0.7%) from 90.684%
24544474967

push

github

web-flow
Merge pull request #2622 from GavinHuttley/develop

ENH: adopt scinexus for app infrastructure and utility functions

290 of 305 new or added lines in 59 files covered. (95.08%)

177 existing lines in 7 files now uncovered.

27536 of 30592 relevant lines covered (90.01%)

5.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/cogent3/app/sqlite_data_store.py
UNCOV
1
from __future__ import annotations
×
2

UNCOV
3
import contextlib
×
UNCOV
4
import datetime
×
UNCOV
5
import os
×
UNCOV
6
import re
×
UNCOV
7
import sqlite3
×
NEW
8
import warnings
×
UNCOV
9
import weakref
×
UNCOV
10
from pathlib import Path
×
UNCOV
11
from typing import TYPE_CHECKING
×
12

NEW
13
from scinexus.misc import extend_docstring_from
×
UNCOV
14
from scitrack import get_text_hexdigest
×
15

NEW
16
warnings.warn(
×
17
    "cogent3.app.sqlite_data_store is discontinued and will be removed in version 2026.9, "
18
    "use scinexus.sqlite_data_store instead",
19
    DeprecationWarning,
20
    stacklevel=2,
21
)
22

UNCOV
23
from cogent3.app.data_store import (
×
24
    _LOG_TABLE,
25
    APPEND,
26
    OVERWRITE,
27
    READONLY,
28
    DataMember,
29
    DataMemberABC,
30
    DataStoreABC,
31
    DataStoreDirectory,
32
    Mode,
33
    StrOrBytes,
34
)
35

36
if TYPE_CHECKING:  # pragma: no cover
37
    from citeable import CitationBase
38

39
    from cogent3.app.typing import TabularType
40
    from cogent3.core.table import Table
41

UNCOV
42
_RESULT_TABLE = "results"
×
UNCOV
43
_MEMORY = ":memory:"
×
UNCOV
44
_mem_pattern = re.compile(r"^\s*[:]{0,1}memory[:]{0,1}\s*$")
×
UNCOV
45
NoneType = type(None)
×
46

47
# dealing with python3.12 deprecation of datetime objects and their sqlite3 handling
48

49

50
def _datetime_to_iso(timestamp: datetime.datetime) -> str:  # pragma: no cover
51
    """timestamp in ISO 8601 format"""
52
    return timestamp.isoformat()
53

54

UNCOV
55
sqlite3.register_adapter(datetime.datetime, _datetime_to_iso)
×
56

57

58
def _datetime_from_iso(data: bytes) -> datetime.datetime:  # pragma: no cover
59
    """timestamp from ISO 8601 format"""
60
    return datetime.datetime.fromisoformat(data.decode())
61

62

UNCOV
63
sqlite3.register_converter("timestamp", _datetime_from_iso)
×
64

65

66
# create db
67
def open_sqlite_db_rw(path: str | Path):  # pragma: no cover
68
    """creates a new sqlitedb for read/write at path, can be an in-memory db
69

70
    Notes
71
    -----
72
    This function embeds the schema. There are three tables:
73

74
    - results: analysis objects, may be completed or not completed
75
    - logs: log-file contents
76
    - state: whether db is locked to a process
77

78
    Returns
79
    -------
80
    Handle to a sqlite3 session
81
    """
82
    db = sqlite3.connect(
83
        path,
84
        isolation_level=None,
85
        detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
86
    )
87
    db.row_factory = sqlite3.Row
88
    create_template = "CREATE TABLE IF NOT EXISTS {};"
89
    # note it is essential to use INTEGER for the autoincrement of primary key to work
90
    creates = [
91
        "state(state_id INTEGER PRIMARY KEY, record_type TEXT, lock_pid INTEGER)",
92
        f"{_LOG_TABLE}(log_id INTEGER PRIMARY KEY, log_name TEXT, date timestamp, data BLOB)",
93
        f"{_RESULT_TABLE}(record_id TEXT PRIMARY KEY, log_id INTEGER, md5 BLOB, is_completed INTEGER, data BLOB)",
94
        "citations(citation_id INTEGER PRIMARY KEY, data TEXT)",
95
    ]
96
    for table in creates:
97
        db.execute(create_template.format(table))
98
    return db
99

100

101
def open_sqlite_db_ro(path):  # pragma: no cover
102
    """returns db opened as read only
103
    Returns
104
    -------
105
    Handle to a sqlite3 session
106
    """
107
    db = sqlite3.connect(
108
        f"file:{path}?mode=ro",
109
        isolation_level=None,
110
        detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
111
        uri=True,
112
    )
113
    db.row_factory = sqlite3.Row
114
    assert has_valid_schema(db)
115
    return db
116

117

118
def has_valid_schema(db):  # pragma: no cover
119
    # TODO: should be a full schema check
120
    query = "SELECT name FROM sqlite_master WHERE type='table'"
121
    result = db.execute(query).fetchall()
122
    table_names = {r["name"] for r in result}
123
    _required = {_RESULT_TABLE, _LOG_TABLE, "state"}
124
    _optional = {"citations"}
125
    return _required <= table_names <= (_required | _optional)
126

127

128
class DataStoreSqlite(DataStoreABC):  # pragma: no cover
129
    store_suffix = "sqlitedb"
130

131
    def __init__(
132
        self,
133
        source: str | Path,
134
        mode: Mode | str = READONLY,
135
        limit: int | None = None,
136
        verbose: bool = False,
137
    ) -> None:
138
        if _mem_pattern.search(str(source)):
139
            self._source = _MEMORY
140
        else:
141
            source = Path(source).expanduser()
142
            self._source = (
143
                source
144
                if source.suffix[1:] == self.store_suffix  # sliced to remove "."
145
                else Path(f"{source}.{self.store_suffix}")
146
            )
147
        self._mode = Mode(mode)
148
        if mode is not READONLY and limit is not None:
149
            msg = "Using limit argument is only valid for readonly datastores"
150
            raise ValueError(
151
                msg,
152
            )
153
        self._limit = limit
154
        self._verbose = verbose
155
        self._db = None
156
        self._open = False
157
        self._log_id = None
158
        weakref.finalize(self, self.close)
159

160
    def __getstate__(self) -> dict:
161
        return {**self._init_vals}
162

163
    def __setstate__(self, state: dict) -> None:
164
        # this will reset connections to read only db's
165
        obj = self.__class__(**state)
166
        self.__dict__.update(obj.__dict__)
167

168
    def __del__(self) -> None:
169
        """close the db connection when the object is deleted"""
170
        self.close()
171

172
    @property
173
    def source(self) -> str | Path:
174
        """string that references connecting to data store, override in subclass constructor"""
175
        return self._source
176

177
    @property
178
    def mode(self) -> Mode:
179
        """string that references datastore mode, override in override in subclass constructor"""
180
        return self._mode
181

182
    @property
183
    def limit(self) -> int | None:
184
        return self._limit
185

186
    @property
187
    def db(self):
188
        if self._db is None:
189
            db_func = open_sqlite_db_ro if self.mode is READONLY else open_sqlite_db_rw
190
            self._db = db_func(self.source)
191
            self._open = True
192
            self.lock()
193

194
        return self._db
195

196
    def _init_log(self) -> None:
197
        timestamp = datetime.datetime.now(tz=datetime.UTC)
198
        self.db.execute(f"INSERT INTO {_LOG_TABLE}(date) VALUES (?)", (timestamp,))
199
        self._log_id = self._db.execute(
200
            f"SELECT log_id FROM {_LOG_TABLE} where date = ?",
201
            (timestamp,),
202
        ).fetchone()["log_id"]
203

204
    def close(self) -> None:
205
        if getattr(self, "_db", None) is None:
206
            return
207
        with contextlib.suppress(sqlite3.ProgrammingError):
208
            self._db.close()
209
        self._open = False
210

211
    def read(self, unique_id: str) -> StrOrBytes:
212
        """
213
        identifier string formed from Path(table_name) / identifier
214
        """
215
        unique_id = Path(unique_id)
216
        table_name = str(unique_id.parent)
217
        if table_name not in (
218
            ".",
219
            _LOG_TABLE,
220
        ):
221
            msg = f"unknown table for {str(unique_id)!r}"
222
            raise ValueError(msg)
223

224
        if table_name != _LOG_TABLE:
225
            cmnd = f"SELECT * FROM {_RESULT_TABLE} WHERE record_id = ?"
226
            result = self.db.execute(cmnd, (unique_id.name,)).fetchone()
227
            return result["data"]
228

229
        cmnd = f"SELECT * FROM {_LOG_TABLE} WHERE log_name = ?"
230
        result = self.db.execute(cmnd, (unique_id.name,)).fetchone()
231

232
        return result["data"]
233

234
    @property
235
    def completed(self) -> list[DataMemberABC]:
236
        if not self._completed:
237
            self._completed = self._select_members(
238
                table_name=_RESULT_TABLE,
239
                is_completed=True,
240
            )
241
        return self._completed
242

243
    @property
244
    def not_completed(self) -> list[DataMemberABC]:
245
        """returns database records of type NotCompleted"""
246
        if not self._not_completed:
247
            self._not_completed = self._select_members(
248
                table_name=_RESULT_TABLE,
249
                is_completed=False,
250
            )
251
        return self._not_completed
252

253
    def _select_members(
254
        self,
255
        *,
256
        table_name: str,
257
        is_completed: bool,
258
    ) -> list[DataMemberABC]:
259
        limit = f"LIMIT {self.limit}" if self.limit else ""
260
        cmnd = self.db.execute(
261
            f"SELECT record_id FROM {table_name} WHERE is_completed=? {limit}",
262
            (is_completed,),
263
        )
264
        return [
265
            DataMember(data_store=self, unique_id=r["record_id"])
266
            for r in cmnd.fetchall()
267
        ]
268

269
    @property
270
    def logs(self) -> list[DataMemberABC]:
271
        """returns all log records"""
272
        cmnd = self.db.execute(f"SELECT log_name FROM {_LOG_TABLE}")
273
        return [
274
            DataMember(data_store=self, unique_id=Path(_LOG_TABLE) / r["log_name"])
275
            for r in cmnd.fetchall()
276
            if r["log_name"]
277
        ]
278

279
    def _write(
280
        self,
281
        *,
282
        table_name: str,
283
        unique_id: str,
284
        data: str,
285
        is_completed: bool,
286
    ) -> DataMemberABC | None:
287
        """
288
        Parameters
289
        ----------
290
        table_name: str
291
            name of table to save data. It must be _RESULT_TABLE or _LOG_TABLE.
292
        unique_id : str
293
            unique identifier that data will be saved under.
294
        data: str
295
            data to be saved.
296
        is_completed: bool
297
            flag to identify NotCompleted results
298

299
        Returns
300
        -------
301
        DataMember instance or None when writing to _LOG_TABLE
302
        """
303
        if self._log_id is None:
304
            self._init_log()
305

306
        if table_name == _LOG_TABLE:
307
            # TODO how to evaluate whether writing a new log?
308
            cmnd = f"UPDATE {table_name} SET data =?, log_name =? WHERE log_id=?"
309
            self.db.execute(cmnd, (data, unique_id, self._log_id))
310
            return None
311

312
        md5 = get_text_hexdigest(data)
313

314
        if unique_id in self and self.mode is not APPEND:
315
            cmnd = f"UPDATE {table_name} SET data= ?, log_id=?, md5=? WHERE record_id=?"
316
            self.db.execute(cmnd, (data, self._log_id, md5, unique_id))
317
        else:
318
            cmnd = f"INSERT INTO {table_name} (record_id,data,log_id,md5,is_completed) VALUES (?,?,?,?,?)"
319
            self.db.execute(cmnd, (unique_id, data, self._log_id, md5, is_completed))
320

321
        return DataMember(data_store=self, unique_id=unique_id)
322

323
    def drop_not_completed(self, *, unique_id: str = "") -> None:
324
        if not unique_id:
325
            cmnd = f"DELETE FROM {_RESULT_TABLE} WHERE is_completed=?"
326
            vals = (0,)
327
        else:
328
            cmnd = f"DELETE FROM {_RESULT_TABLE} WHERE is_completed=? AND record_id=?"
329
            vals = (0, unique_id)
330
        self.db.execute(cmnd, vals)
331
        self._not_completed = []
332

333
    @property
334
    def _lock_id(self) -> int | None:
335
        """returns lock_pid"""
336
        result = self.db.execute("SELECT lock_pid FROM state").fetchone()
337
        return result[0] if result else result
338

339
    @property
340
    def locked(self) -> bool:
341
        """returns if lock_pid is NULL or doesn't exist."""
342
        return self._lock_id is not None
343

344
    def lock(self) -> None:
345
        """if writable, and not locked, locks the database to this pid"""
346
        # if mode=w and the data store exists AND has a lock_pid
347
        # value already, then we should fail. The user might
348
        # inadvertently overwrite something otherwise.
349
        # BUT if mode=a, as the user expects to be modifying the data
350
        # store then we have to update the value
351
        if self.mode is READONLY:
352
            return
353

354
        result = self._db.execute("SELECT state_id,lock_pid FROM state").fetchall()
355
        locked = result[0]["lock_pid"] if result else None
356
        if locked and self.mode is OVERWRITE:
357
            msg = (
358
                f"You are trying to OVERWRITE {str(self.source)!r} which is "
359
                "locked. Use APPEND mode or unlock."
360
            )
361
            raise OSError(
362
                msg,
363
            )
364

365
        if result:
366
            # we will update an existing
367
            state_id = result[0]["state_id"]
368
            cmnd = "UPDATE state SET lock_pid=? WHERE state_id=?"
369
            vals = [os.getpid(), state_id]
370
        else:
371
            cmnd = "INSERT INTO state(lock_pid) VALUES (?)"
372
            vals = [os.getpid()]
373
        self._db.execute(cmnd, tuple(vals))
374

375
    def unlock(self, force=False) -> None:
376
        """remove a lock if pid matches. If force, ignores pid. ignored if mode is READONLY"""
377
        if self.mode is READONLY:
378
            return
379

380
        lock_id = self._lock_id
381
        if lock_id is None:
382
            return
383

384
        if lock_id == os.getpid() or force:
385
            self.db.execute("UPDATE state SET lock_pid=NULL WHERE state_id=1")
386

387
        return
388

389
    @extend_docstring_from(DataStoreDirectory.write)
390
    def write(self, *, unique_id: str, data: StrOrBytes) -> DataMemberABC:
391
        if unique_id.startswith(_RESULT_TABLE):
392
            unique_id = Path(unique_id).name
393

394
        super().write(unique_id=unique_id, data=data)
395

396
        self.drop_not_completed(unique_id=unique_id)
397

398
        member = self._write(
399
            table_name=_RESULT_TABLE,
400
            unique_id=unique_id,
401
            data=data,
402
            is_completed=True,
403
        )
404
        if (
405
            member is not None and member not in self._completed
406
        ):  # new to check existence
407
            self._completed.append(member)
408
        return member
409

410
    @extend_docstring_from(DataStoreDirectory.write_log)
411
    def write_log(self, *, unique_id: str, data: StrOrBytes) -> None:
412
        if unique_id.startswith(_LOG_TABLE):
413
            unique_id = Path(unique_id).name
414

415
        super().write_log(unique_id=unique_id, data=data)
416
        _ = self._write(
417
            table_name=_LOG_TABLE,
418
            unique_id=unique_id,
419
            data=data,
420
            is_completed=False,
421
        )
422

423
    @extend_docstring_from(DataStoreDirectory.write_not_completed)
424
    def write_not_completed(self, *, unique_id: str, data: StrOrBytes) -> DataMemberABC:
425
        if unique_id.startswith(_RESULT_TABLE):
426
            unique_id = Path(unique_id).name
427

428
        super().write_not_completed(unique_id=unique_id, data=data)
429
        member = self._write(
430
            table_name=_RESULT_TABLE,
431
            unique_id=unique_id,
432
            data=data,
433
            is_completed=False,
434
        )
435
        if member is not None:
436
            self._not_completed.append(member)
437
        return member
438

439
    def md5(self, unique_id: str) -> str | NoneType:  # we have it in base class
440
        """
441
        Parameters
442
        ----------
443
        unique_id
444
            name of data store member
445
        Returns
446
        -------
447
        md5 checksum for the member, if available, None otherwise
448
        """
449
        cmnd = f"SELECT * FROM {_RESULT_TABLE} WHERE record_id = ?"
450
        result = self.db.execute(cmnd, (unique_id,)).fetchone()
451

452
        return result["md5"] if result else None
453

454
    def write_citations(self, *, data: tuple[CitationBase, ...]) -> None:
455
        if not data:
456
            return
457
        if not self._has_citations_table():
458
            self.db.execute(
459
                "CREATE TABLE IF NOT EXISTS citations"
460
                "(citation_id INTEGER PRIMARY KEY, data TEXT)",
461
            )
462
        from citeable import to_jsons
463

464
        json_data = to_jsons(data)
465
        existing = self.db.execute("SELECT citation_id FROM citations").fetchone()
466
        if existing:
467
            self.db.execute(
468
                "UPDATE citations SET data=? WHERE citation_id=?",
469
                (json_data, existing["citation_id"]),
470
            )
471
        else:
472
            self.db.execute("INSERT INTO citations(data) VALUES (?)", (json_data,))
473

474
    def _load_citations(self) -> list[CitationBase]:
475
        from citeable import from_jsons
476

477
        if not self._has_citations_table():
478
            return []
479
        result = self.db.execute("SELECT data FROM citations").fetchone()
480
        if not result:
481
            return []
482
        return from_jsons(result["data"])
483

484
    def _has_citations_table(self) -> bool:
485
        result = self.db.execute(
486
            "SELECT name FROM sqlite_master WHERE type='table' AND name='citations'",
487
        ).fetchone()
488
        return result is not None
489

490
    @property
491
    def summary_citations(self) -> TabularType:
492
        from cogent3.core.table import Table
493

494
        citations = self._load_citations()
495
        rows = [list(c.summary()) for c in citations]
496
        return Table(header=["app", "citation"], data=rows, title="citations")
497

498
    @property
499
    def describe(self) -> Table:
500
        if self.locked and self._lock_id != os.getpid():
501
            title = f"Locked db store. Locked to pid={self._lock_id}, current pid={os.getpid()}."
502
        elif self.locked:
503
            title = "Locked to the current process."
504
        else:
505
            title = "Unlocked db store."
506
        table = super().describe
507
        table.title = title
508
        return table
509

510
    @property
511
    def record_type(self) -> str:
512
        """class name of completed results"""
513
        result = self.db.execute("SELECT record_type FROM state").fetchone()
514
        return result["record_type"]
515

516
    @record_type.setter
517
    def record_type(self, obj) -> None:
518
        from scinexus.misc import get_object_provenance
519

520
        rt = self.record_type
521
        if self.mode is OVERWRITE and rt:
522
            msg = f"cannot overwrite existing record_type {rt}"
523
            raise OSError(msg)
524

525
        n = get_object_provenance(obj)
526
        self.db.execute("UPDATE state SET record_type=? WHERE state_id=1", (n,))
527

528
    @property
529
    def summary_not_completed(self) -> Table:
530
        """returns a table summarising not completed results"""
531
        from .data_store import summary_not_completeds
532
        from .io import DEFAULT_DESERIALISER
533

534
        return summary_not_completeds(
535
            self.not_completed,
536
            deserialise=DEFAULT_DESERIALISER,
537
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc