• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 3496312259

pending completion
3496312259

push

github

Mike Borsetti
Version 3.12rc2

1335 of 2027 branches covered (65.86%)

Branch coverage included in aggregate %.

43 of 89 new or added lines in 9 files covered. (48.31%)

5 existing lines in 2 files now uncovered.

3345 of 4251 relevant lines covered (78.69%)

6.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.86
/webchanges/storage_minidb.py
1
"""Legacy minidb cache database.
2

3
Code is loaded only:
4

5
* when reading databases created in version < 3.2.0 or in urlwatch;
6
* when running with the '--database-engine minidb' switch;
7
* testing migration of legacy database.
8

9
Having it into a standalone module allows running the program without requiring minidb package to be installed.
10
"""
11

12
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
13

14
from __future__ import annotations
8✔
15

16
from pathlib import Path
8✔
17
from typing import Any, Dict, List, Optional, Union
8✔
18

19
from .storage import CacheStorage, Snapshot
8✔
20

21
try:
8✔
22
    import minidb
8✔
NEW
23
except ImportError as e:
×
24

UNCOV
25
    class minidb:  # type: ignore[no-redef]
×
UNCOV
26
        class Model:
×
UNCOV
27
            pass
×
28

NEW
29
    minidb_error = e.msg
×
30

31

32
class CacheMiniDBStorage(CacheStorage):
8✔
33
    class CacheEntry(minidb.Model):
8✔
34
        guid = str
8✔
35
        timestamp = int
8✔
36
        data = str
8✔
37
        tries = int
8✔
38
        etag = str
8✔
39

40
    def __init__(self, filename: Union[str, Path]) -> None:
8✔
41
        super().__init__(filename)
8✔
42

43
        if isinstance(minidb, type):
8!
NEW
44
            raise ImportError(f"Python package 'minidb' cannot be imported.\n{minidb_error}")
×
45

46
        self.filename.parent.mkdir(parents=True, exist_ok=True)
8✔
47

48
        self.db = minidb.Store(str(self.filename), debug=True)
8✔
49
        self.db.register(self.CacheEntry)
8✔
50

51
    def close(self) -> None:
8✔
52
        self.db.close()
8✔
53
        self.db = None
8✔
54

55
    def get_guids(self) -> List[str]:
8✔
56
        return [guid for guid, in self.CacheEntry.query(self.db, minidb.Function('distinct', self.CacheEntry.c.guid))]
8✔
57

58
    def load(self, guid: str) -> Snapshot:
8✔
59
        for data, timestamp, tries, etag in self.CacheEntry.query(
8✔
60
            self.db,
61
            self.CacheEntry.c.data // self.CacheEntry.c.timestamp // self.CacheEntry.c.tries // self.CacheEntry.c.etag,
62
            order_by=minidb.columns(self.CacheEntry.c.timestamp.desc, self.CacheEntry.c.tries.desc),
63
            where=self.CacheEntry.c.guid == guid,
64
            limit=1,
65
        ):
66
            return Snapshot(data, timestamp, tries, etag)
8✔
67

68
        return Snapshot('', 0, 0, '')
8✔
69

70
    def get_history_data(self, guid: str, count: Optional[int] = None) -> Dict[str, float]:
8✔
71
        history: Dict[str, float] = {}
8✔
72
        if count is not None and count < 1:
8✔
73
            return history
8✔
74
        for data, timestamp in self.CacheEntry.query(
8✔
75
            self.db,
76
            self.CacheEntry.c.data // self.CacheEntry.c.timestamp,
77
            order_by=minidb.columns(self.CacheEntry.c.timestamp.desc, self.CacheEntry.c.tries.desc),
78
            where=(self.CacheEntry.c.guid == guid)
79
            & ((self.CacheEntry.c.tries == 0) | (self.CacheEntry.c.tries is None)),
80
        ):
81
            if data not in history:
8!
82
                history[data] = timestamp
8✔
83
                if count is not None and len(history) >= count:
8!
84
                    break
×
85
        return history
8✔
86

87
    def get_history_snapshots(self, guid: str, count: Optional[int] = None) -> List[Snapshot]:
8✔
88
        if count is not None and count < 1:
8✔
89
            return []
8✔
90
        history: List[Snapshot] = []
8✔
91
        for data, timestamp in self.CacheEntry.query(
8✔
92
            self.db,
93
            self.CacheEntry.c.data // self.CacheEntry.c.timestamp,
94
            order_by=minidb.columns(self.CacheEntry.c.timestamp.desc, self.CacheEntry.c.tries.desc),
95
            where=(self.CacheEntry.c.guid == guid)
96
            & ((self.CacheEntry.c.tries == 0) | (self.CacheEntry.c.tries is None)),
97
        ):
98
            history.append(Snapshot(data, timestamp, 0, ''))
8✔
99
            if count is not None and len(history) >= count:
8!
100
                break
×
101
        return history
8✔
102

103
    def save(
8✔
104
        self,
105
        *args: Any,
106
        guid: str,
107
        data: str,
108
        timestamp: float,
109
        tries: int,
110
        etag: Optional[str],
111
        **kwargs: Any,
112
    ) -> None:
113
        self.db.save(self.CacheEntry(guid=guid, timestamp=timestamp, data=data, tries=tries, etag=etag))
8✔
114
        self.db.commit()
8✔
115

116
    def delete(self, guid: str) -> None:
8✔
117
        self.CacheEntry.delete_where(self.db, self.CacheEntry.c.guid == guid)
8✔
118
        self.db.commit()
8✔
119

120
    def delete_latest(self, guid: str, delete_entries: int = 1) -> int:
8✔
121
        raise NotImplementedError("Deleting of latest snapshot not supported by 'minidb' database engine")
122

123
    def clean(self, guid: str, keep_entries: int = 1) -> int:
8✔
124
        if keep_entries != 1:
8✔
125
            raise NotImplementedError("Only keeping latest 1 entry is supported by 'minidb' database engine")
126

127
        keep_id = next(
8✔
128
            (
129
                self.CacheEntry.query(
130
                    self.db,
131
                    self.CacheEntry.c.id,
132
                    where=self.CacheEntry.c.guid == guid,
133
                    order_by=self.CacheEntry.c.timestamp.desc,
134
                    limit=1,
135
                )
136
            ),
137
            (None,),
138
        )[0]
139

140
        if keep_id is not None:
8!
141
            result: int = self.CacheEntry.delete_where(
8✔
142
                self.db, (self.CacheEntry.c.guid == guid) & (self.CacheEntry.c.id != keep_id)
143
            )
144
            self.db.commit()
8✔
145
            return result
8✔
146

147
        return 0
×
148

149
    def rollback(self, timestamp: float) -> None:
8✔
150
        raise NotImplementedError("Rolling back of legacy 'minidb' databases is not supported")
151

152
    def flushdb(self) -> None:
8✔
153
        """Delete all entries of the database.  Use with care, there is no undo!"""
154
        for guid in self.get_guids():
8✔
155
            self.delete(guid)
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc