• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 21545658727

31 Jan 2026 02:04PM UTC coverage: 73.318% (-0.3%) from 73.637%
21545658727

push

github

mborsetti
Version 3.33.0

1404 of 2258 branches covered (62.18%)

Branch coverage included in aggregate %.

1 of 9 new or added lines in 2 files covered. (11.11%)

792 existing lines in 7 files now uncovered.

4710 of 6081 relevant lines covered (77.45%)

11.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.02
/webchanges/storage.py
1
"""Handles all storage: jobs files, config files, hooks file, and cache database engines."""
2

3
# The code below is subject to the license contained in the LICENSE.md file, which is part of the source code.
4

5
from __future__ import annotations
15✔
6

7
import copy
15✔
8
import inspect
15✔
9
import io
15✔
10
import logging
15✔
11
import os
15✔
12
import shutil
15✔
13
import sqlite3
15✔
14
import sys
15✔
15
import threading
15✔
16
import warnings
15✔
17
from abc import ABC, abstractmethod
15✔
18
from collections import defaultdict
15✔
19
from dataclasses import dataclass
15✔
20
from datetime import datetime, timezone
15✔
21
from pathlib import Path
15✔
22
from types import NoneType
15✔
23
from typing import Any, Iterable, Iterator, Literal, TextIO, TypedDict
15✔
24

25
import msgpack
15✔
26
import yaml
15✔
27
import yaml.scanner
15✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
15✔
30
from webchanges.filters import FilterBase
15✔
31
from webchanges.handler import ErrorData, Snapshot
15✔
32
from webchanges.jobs import JobBase, ShellJob
15✔
33
from webchanges.reporters import ReporterBase
15✔
34
from webchanges.util import edit_file, file_ownership_checks
15✔
35

36
try:
15✔
37
    from httpx import Headers
15✔
38
except ImportError:  # pragma: no cover
39
    from webchanges._vendored.headers import Headers
40

41
try:
15✔
42
    import redis
15✔
43
except ImportError as e:  # pragma: no cover
44
    redis = str(e)  # type: ignore[assignment]
45

46
logger = logging.getLogger(__name__)
15✔
47

48
_ConfigDisplay = TypedDict(
15✔
49
    '_ConfigDisplay',
50
    {
51
        'new': bool,
52
        'error': bool,
53
        'unchanged': bool,
54
        'empty-diff': bool,
55
    },
56
)
57
_ConfigReportText = TypedDict(
15✔
58
    '_ConfigReportText',
59
    {
60
        'line_length': int,
61
        'details': bool,
62
        'footer': bool,
63
        'minimal': bool,
64
        'separate': bool,
65
    },
66
)
67
_ConfigReportHtml = TypedDict(
15✔
68
    '_ConfigReportHtml',
69
    {
70
        'diff': Literal['unified', 'table'],
71
        'footer': bool,
72
        'separate': bool,
73
        'title': str,
74
    },
75
)
76
_ConfigReportMarkdown = TypedDict(
15✔
77
    '_ConfigReportMarkdown',
78
    {
79
        'details': bool,
80
        'footer': bool,
81
        'minimal': bool,
82
        'separate': bool,
83
    },
84
)
85
_ConfigReportStdout = TypedDict(
15✔
86
    '_ConfigReportStdout',
87
    {
88
        'enabled': bool,
89
        'color': bool | Literal['normal', 'bright'],
90
    },
91
)
92
_ConfigReportBrowser = TypedDict(
15✔
93
    '_ConfigReportBrowser',
94
    {
95
        'enabled': bool,
96
    },
97
)
98
_ConfigReportDiscord = TypedDict(
15✔
99
    '_ConfigReportDiscord',
100
    {
101
        'enabled': bool,
102
        'webhook_url': str,
103
        'embed': bool,
104
        'subject': str,
105
        'colored': bool,
106
        'max_message_length': int | None,
107
    },
108
)
109
_ConfigReportEmailSmtp = TypedDict(
15✔
110
    '_ConfigReportEmailSmtp',
111
    {
112
        'host': str,
113
        'user': str,
114
        'port': int,
115
        'starttls': bool,
116
        'auth': bool,
117
        'insecure_password': str,
118
        'utf-8': bool,
119
    },
120
)
121
_ConfigReportEmailSendmail = TypedDict(
15✔
122
    '_ConfigReportEmailSendmail',
123
    {
124
        'path': str | Path,
125
    },
126
)
127
_ConfigReportEmail = TypedDict(
15✔
128
    '_ConfigReportEmail',
129
    {
130
        'enabled': bool,
131
        'html': bool,
132
        'to': str,
133
        'from': str,
134
        'subject': str,
135
        'method': Literal['sendmail', 'smtp'],
136
        'smtp': _ConfigReportEmailSmtp,
137
        'sendmail': _ConfigReportEmailSendmail,
138
    },
139
)
140
_ConfigReportGithubIssue = TypedDict(
15✔
141
    '_ConfigReportGithubIssue',
142
    {
143
        'enabled': bool,
144
        'token': str,
145
        'owner': str,
146
        'repo': str,
147
        'title': str,
148
        'labels': list[str],
149
        'format_dt': str,
150
        'format_content': str,
151
        'assignees': list[str],
152
        'type': str,
153
        'milestone': str,
154
    },
155
)
156
_ConfigReportGotify = TypedDict(
15✔
157
    '_ConfigReportGotify',
158
    {
159
        'enabled': bool,
160
        'priority': int,
161
        'server_url': str,
162
        'title': str,
163
        'token': str,
164
    },
165
)
166
_ConfigReportIfttt = TypedDict(
15✔
167
    '_ConfigReportIfttt',
168
    {
169
        'enabled': bool,
170
        'key': str,
171
        'event': str,
172
    },
173
)
174
_ConfigReportMailgun = TypedDict(
15✔
175
    '_ConfigReportMailgun',
176
    {
177
        'enabled': bool,
178
        'region': str,
179
        'api_key': str,
180
        'domain': str,
181
        'from_mail': str,
182
        'from_name': str,
183
        'to': str,
184
        'subject': str,
185
    },
186
)
187
_ConfigReportMatrix = TypedDict(
15✔
188
    '_ConfigReportMatrix',
189
    {
190
        'enabled': bool,
191
        'homeserver': str,
192
        'access_token': str,
193
        'room_id': str,
194
    },
195
)
196
_ConfigReportProwl = TypedDict(
15✔
197
    '_ConfigReportProwl',
198
    {
199
        'enabled': bool,
200
        'api_key': str,
201
        'priority': int,
202
        'application': str,
203
        'subject': str,
204
    },
205
)
206
_ConfigReportPushbullet = TypedDict(
15✔
207
    '_ConfigReportPushbullet',
208
    {
209
        'enabled': bool,
210
        'api_key': str,
211
    },
212
)
213
_ConfigReportPushover = TypedDict(
15✔
214
    '_ConfigReportPushover',
215
    {
216
        'enabled': bool,
217
        'app': str,
218
        'device': str | None,
219
        'sound': str,
220
        'user': str,
221
        'priority': str,
222
    },
223
)
224
_ConfigReportRunCommand = TypedDict(
15✔
225
    '_ConfigReportRunCommand',
226
    {
227
        'enabled': bool,
228
        'command': str,
229
    },
230
)
231
_ConfigReportTelegram = TypedDict(
15✔
232
    '_ConfigReportTelegram',
233
    {
234
        'enabled': bool,
235
        'bot_token': str,
236
        'chat_id': str | int | list[str | int],
237
        'silent': bool,
238
    },
239
)
240
_ConfigReportWebhook = TypedDict(
15✔
241
    '_ConfigReportWebhook',
242
    {
243
        'enabled': bool,
244
        'markdown': bool,
245
        'webhook_url': str,
246
        'rich_text': bool | None,
247
        'max_message_length': int | None,
248
    },
249
)
250
_ConfigReportXmpp = TypedDict(
15✔
251
    '_ConfigReportXmpp',
252
    {
253
        'enabled': bool,
254
        'sender': str,
255
        'recipient': str,
256
        'insecure_password': str | None,
257
    },
258
)
259

260
_ConfigReport = TypedDict(
15✔
261
    '_ConfigReport',
262
    {
263
        'tz': str | None,
264
        'text': _ConfigReportText,
265
        'html': _ConfigReportHtml,
266
        'markdown': _ConfigReportMarkdown,
267
        'stdout': _ConfigReportStdout,
268
        'browser': _ConfigReportBrowser,
269
        'discord': _ConfigReportDiscord,
270
        'email': _ConfigReportEmail,
271
        'github_issue': _ConfigReportGithubIssue,
272
        'gotify': _ConfigReportGotify,
273
        'ifttt': _ConfigReportIfttt,
274
        'mailgun': _ConfigReportMailgun,
275
        'matrix': _ConfigReportMatrix,
276
        'prowl': _ConfigReportProwl,
277
        'pushbullet': _ConfigReportPushbullet,
278
        'pushover': _ConfigReportPushover,
279
        'run_command': _ConfigReportRunCommand,
280
        'telegram': _ConfigReportTelegram,
281
        'webhook': _ConfigReportWebhook,
282
        'xmpp': _ConfigReportXmpp,
283
    },
284
)
285
_ConfigJobDefaults = TypedDict(
15✔
286
    '_ConfigJobDefaults',
287
    {
288
        '_note': str,
289
        'all': dict[str, Any],
290
        'url': dict[str, Any],
291
        'browser': dict[str, Any],
292
        'command': dict[str, Any],
293
    },
294
    total=False,
295
)
296
_ConfigDifferDefaults = TypedDict(
15✔
297
    '_ConfigDifferDefaults',
298
    {
299
        '_note': str,
300
        'unified': dict[str, Any],
301
        'ai_google': dict[str, Any],
302
        'command': dict[str, Any],
303
        'deepdiff': dict[str, Any],
304
        'image': dict[str, Any],
305
        'table': dict[str, Any],
306
        'wdiff': dict[str, Any],
307
    },
308
    total=False,
309
)
310
_ConfigDatabase = TypedDict(
15✔
311
    '_ConfigDatabase',
312
    {
313
        'engine': Literal['sqlite3', 'redis', 'minidb', 'textfiles'] | str,  # noqa: PYI051
314
        'max_snapshots': int,
315
    },
316
)
317
_Config = TypedDict(
15✔
318
    '_Config',
319
    {
320
        'display': _ConfigDisplay,
321
        'report': _ConfigReport,
322
        'job_defaults': _ConfigJobDefaults,
323
        'differ_defaults': _ConfigDifferDefaults,
324
        'database': _ConfigDatabase,
325
        'footnote': str | None,
326
    },
327
)
328

329
DEFAULT_CONFIG: _Config = {
15✔
330
    'display': {  # select whether the report include the categories below
331
        'new': True,
332
        'error': True,
333
        'unchanged': False,
334
        'empty-diff': False,
335
    },
336
    'report': {
337
        'tz': None,  # the timezone as a IANA time zone name, e.g. 'America/Los_Angeles', or null for machine's
338
        # the directives below are for the report content types (text, html or markdown)
339
        'text': {
340
            'details': True,  # whether the diff is sent
341
            'footer': True,
342
            'line_length': 75,
343
            'minimal': False,
344
            'separate': False,
345
        },
346
        'html': {
347
            'diff': 'unified',  # 'unified' or 'table'
348
            'footer': True,
349
            'separate': False,
350
            'title': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
351
        },
352
        'markdown': {
353
            'details': True,  # whether the diff is sent
354
            'footer': True,
355
            'minimal': False,
356
            'separate': False,
357
        },
358
        # the directives below control 'reporters', i.e. where a report is displayed and/or sent
359
        'stdout': {  # the console / command line display; uses text
360
            'enabled': True,
361
            'color': True,
362
        },
363
        'browser': {  # the system's default browser; uses html
364
            'enabled': False,
365
        },
366
        'discord': {
367
            'enabled': False,
368
            'webhook_url': '',
369
            'embed': True,
370
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
371
            'colored': True,
372
            'max_message_length': None,
373
        },
374
        'email': {  # email (except mailgun); uses text or both html and text if 'html' is set to true
375
            'enabled': False,
376
            'html': True,
377
            'from': '',
378
            'to': '',
379
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
380
            'method': 'smtp',  # either 'smtp' or 'sendmail'
381
            'smtp': {
382
                'host': 'localhost',
383
                'port': 587,
384
                'starttls': True,
385
                'auth': True,
386
                'user': '',
387
                'insecure_password': '',
388
                'utf-8': True,
389
            },
390
            'sendmail': {
391
                'path': 'sendmail',
392
            },
393
        },
394
        'github_issue': {
395
            'enabled': False,
396
            'token': '',
397
            'owner': '',
398
            'repo': '',
399
            'title': '',
400
            'labels': [],
401
            'format_dt': '',
402
            'format_content': '',
403
            'assignees': [],
404
            'type': '',
405
            'milestone': '',
406
        },
407
        'gotify': {  # uses markdown
408
            'enabled': False,
409
            'priority': 0,
410
            'server_url': '',
411
            'title': '',
412
            'token': '',
413
        },
414
        'ifttt': {  # uses text
415
            'enabled': False,
416
            'key': '',
417
            'event': '',
418
        },
419
        'mailgun': {  # uses text
420
            'enabled': False,
421
            'region': 'us',
422
            'api_key': '',
423
            'domain': '',
424
            'from_mail': '',
425
            'from_name': '',
426
            'to': '',
427
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
428
        },
429
        'matrix': {  # uses text
430
            'enabled': False,
431
            'homeserver': '',
432
            'access_token': '',
433
            'room_id': '',
434
        },
435
        'prowl': {  # uses text
436
            'enabled': False,
437
            'api_key': '',
438
            'priority': 0,
439
            'application': '',
440
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
441
        },
442
        'pushbullet': {  # uses text
443
            'enabled': False,
444
            'api_key': '',
445
        },
446
        'pushover': {  # uses text
447
            'enabled': False,
448
            'app': '',
449
            'user': '',
450
            'device': None,
451
            'sound': 'spacealarm',
452
            'priority': 'normal',
453
        },
454
        'run_command': {  # uses text
455
            'enabled': False,
456
            'command': '',
457
        },
458
        'telegram': {  # uses markdown (from version 3.7)
459
            'enabled': False,
460
            'bot_token': '',
461
            'chat_id': '',
462
            'silent': False,
463
        },
464
        'webhook': {
465
            'enabled': False,
466
            'webhook_url': '',
467
            'markdown': False,
468
            'rich_text': None,
469
            'max_message_length': None,
470
        },
471
        'xmpp': {  # uses text
472
            'enabled': False,
473
            'sender': '',
474
            'recipient': '',
475
            'insecure_password': '',
476
        },
477
    },
478
    'job_defaults': {
479
        '_note': 'Default directives that are applied to jobs.',
480
        'all': {'_note': 'These are used for all type of jobs, including those in hooks.py.'},
481
        'url': {'_note': "These are used for 'url' jobs without 'use_browser'."},
482
        'browser': {'_note': "These are used for 'url' jobs with 'use_browser: true'."},
483
        'command': {'_note': "These are used for 'command' jobs."},
484
    },
485
    'differ_defaults': {
486
        '_note': 'Default directives that are applied to individual differs.',
487
        'unified': {},
488
        'ai_google': {},
489
        'command': {},
490
        'deepdiff': {},
491
        'image': {},
492
        'table': {},
493
        'wdiff': {},
494
    },
495
    'database': {
496
        'engine': 'sqlite3',
497
        'max_snapshots': 4,
498
    },
499
    'footnote': None,
500
}
501

502

503
# Custom YAML constructor for !include
504
def yaml_include(loader: yaml.SafeLoader, node: yaml.Node) -> list[Any]:
15✔
505
    file_path = Path(loader.name).parent.joinpath(node.value)
×
506
    with file_path.open('r') as f:
×
507
        return list(yaml.safe_load_all(f))
×
508

509

510
# Add the custom constructor to the YAML loader
511
yaml.add_constructor('!include', yaml_include, Loader=yaml.SafeLoader)
15✔
512

513

514
@dataclass
15✔
515
class BaseStorage(ABC):  # noqa:  B024 abstract base class, but it has no abstract methods or properties
15✔
516
    """Base class for storage."""
517

518

519
class BaseFileStorage(BaseStorage, ABC):
15✔
520
    """Base class for file storage."""
521

522
    def __init__(self, filename: str | Path) -> None:
15✔
523
        """:param filename: The filename or directory name to storage."""
524
        if isinstance(filename, str):
15✔
525
            self.filename = Path(filename)
15✔
526
        else:
527
            self.filename = filename
15✔
528

529

530
class BaseTextualFileStorage(BaseFileStorage, ABC):
15✔
531
    """Base class for textual files."""
532

533
    def __init__(self, filename: str | Path) -> None:
15✔
534
        """:param filename: The filename or directory name to storage."""
535
        super().__init__(filename)
15✔
536
        # if not isinstance(self, JobsBaseFileStorage):
537
        #     self.load()
538

539
    @abstractmethod
15✔
540
    def load(self, *args: Any) -> Any:  # noqa: ANN401 Dynamically typed expressions (typing.Any) are disallowed
15✔
541
        """Load from storage.
542

543
        :param args: Specified by the subclass.
544
        :return: Specified by the subclass.
545
        """
546

547
    @abstractmethod
15✔
548
    def save(self, *args: Any, **kwargs: Any) -> Any:  # noqa: ANN401 Dynamically typed expressions (typing.Any) are disallowed
15✔
549
        """Save to storage.
550

551
        :param args: Specified by the subclass.
552
        :param kwargs: Specified by the subclass.
553
        :return: Specified by the subclass.
554
        """
555

556
    @classmethod
15✔
557
    @abstractmethod
15✔
558
    def parse(cls, filename: Path) -> Any:  # noqa: ANN401 Dynamically typed expressions (typing.Any) are disallowed
15✔
559
        """Parse storage contents.
560

561
        :param filename: The filename.
562
        :return: Specified by the subclass.
563
        """
564

565
    def edit(self) -> int:
15✔
566
        """Edit file.
567

568
        :returns: None if edit is successful, 1 otherwise.
569
        """
570
        # Similar code to UrlwatchCommand.edit_hooks()
571
        logger.debug(f'Edit file {self.filename}')
15✔
572
        if isinstance(self.filename, list):
15✔
573
            if len(self.filename) > 1:
15!
574
                raise ValueError(f'Only one jobs file can be specified for editing; found {len(self.filename)}.')
×
575
            filename = self.filename[0]
15✔
576
        else:
577
            filename = self.filename
15✔
578
        file_edit = filename.with_stem(filename.stem + '_edit')
15✔
579

580
        if filename.is_file():
15!
581
            shutil.copy(filename, file_edit)
15✔
582
        # elif example_file is not None and Path(example_file).is_file():
583
        #     shutil.copy(example_file, file_edit, follow_symlinks=False)
584

585
        while True:
15✔
586
            try:
15✔
587
                edit_file(file_edit)
15✔
588
                # Check if we can still parse it
589
                if self.parse is not None:
15!
590
                    self.parse(file_edit)
15✔
591
                break  # stop if no exception on parser
15✔
592
            except SystemExit:
15✔
593
                raise
×
594
            except Exception as e:  # noqa: BLE001 Do not catch blind exception: `Exception`
15✔
595
                print()
15✔
596
                print('Errors in updating file:')
15✔
597
                print('======')
15✔
598
                print(e)
15✔
599
                print('======')
15✔
600
                print()
15✔
601
                print(f'The file {filename} was NOT updated.')
15✔
602
                user_input = input('Do you want to retry the same edit? [Y/n] ')
15✔
603
                if not user_input or user_input.lower().startswith('y'):
×
604
                    continue
×
605
                file_edit.unlink()
×
606
                print('No changes have been saved.')
×
607
                return 1
×
608

609
        if filename.is_symlink():
15!
610
            filename.write_text(file_edit.read_text())
×
611
        else:
612
            file_edit.replace(filename)
15✔
613
        file_edit.unlink(missing_ok=True)
15✔
614
        print('Saved edits in', filename)
15✔
615
        return 0
15✔
616

617

618
class JobsBaseFileStorage(BaseTextualFileStorage, ABC):
15✔
619
    """Class for jobs textual files storage."""
620

621
    filename: list[Path]
15✔
622

623
    def __init__(self, filename: list[Path]) -> None:
15✔
624
        """Class for jobs textual files storage.
625

626
        :param filename: The filenames of the jobs file.
627
        """
628
        super().__init__(filename)  # type: ignore[arg-type]
15✔
629
        self.filename = filename
15✔
630

631
    def load_secure(self) -> list[JobBase]:
15✔
632
        """Load the jobs from a text file checking that the file is secure (i.e. belongs to the current UID and only
633
        the owner can write to it - Linux only).
634

635
        :return: List of JobBase objects.
636
        """
637
        jobs: list[JobBase] = self.load()
15✔
638

639
        def is_shell_job(job: JobBase) -> bool:
15✔
640
            """Check if the job uses filter 'shellpipe' or an external differ, as they call
641
            subprocess.run(shell=True) (insecure).
642

643
            :returns: True if subprocess.run(shell=True) is invoked by job, False otherwise.
644
            """
645
            if isinstance(job, ShellJob):
10✔
646
                return True
10✔
647

648
            for filter_kind, _ in FilterBase.normalize_filter_list(job.filters, job.index_number):
10!
649
                if filter_kind == 'shellpipe':
×
650
                    return True
×
651

652
                if job.differ and job.differ.get('name') == 'command':
×
653
                    return True
×
654

655
            return False
10✔
656

657
        shelljob_errors = []
15✔
658
        for file in self.filename:
15✔
659
            shelljob_errors.extend(file_ownership_checks(file))
15✔
660
        removed_jobs = (job for job in jobs if is_shell_job(job))
15✔
661
        if shelljob_errors and any(removed_jobs):
15✔
662
            print(
10✔
663
                f'ERROR: Removing the following jobs because '
664
                f' {" and ".join(shelljob_errors)}: {" ,".join(str(job.index_number) for job in removed_jobs)}\n'
665
                f'(see {__docs_url__}en/stable/jobs.html#important-note-for-command-jobs).'
666
            )
667
            jobs = [job for job in jobs if job not in removed_jobs]
10✔
668

669
        logger.info(f'Loaded {len(jobs)} jobs from {", ".join(str(file) for file in self.filename)}.')
15✔
670
        return jobs
15✔
671

672

673
class BaseYamlFileStorage(BaseTextualFileStorage, ABC):
15✔
674
    """Base class for YAML textual files storage."""
675

676
    @classmethod
15✔
677
    def parse(cls, filename: Path) -> Any:  # noqa: ANN401 Dynamically typed expressions (typing.Any) are disallowed
15✔
678
        """Return contents of YAML file if it exists
679

680
        :param filename: The filename Path.
681
        :return: Specified by the subclass.
682
        """
683
        if filename is not None and filename.is_file():
15✔
684
            with filename.open() as fp:
15✔
685
                return yaml.safe_load(fp)
15✔
686
        return None
15✔
687

688

689
class YamlConfigStorage(BaseYamlFileStorage):
15✔
690
    """Class for configuration file (is a YAML textual file)."""
691

692
    config: _Config = {}
15✔
693

694
    @staticmethod
15✔
695
    def dict_deep_difference(d1: _Config, d2: _Config, ignore_underline_keys: bool = False) -> _Config:
15✔
696
        """Recursively find elements in the first dict that are not in the second.
697

698
        :param d1: The first dict.
699
        :param d2: The second dict.
700
        :param ignore_underline_keys: If true, keys starting with _ are ignored (treated as remarks)
701
        :return: A dict with all the elements on the first dict that are not in the second.
702
        """
703

704
        def _sub_dict_deep_difference(d1_: _Config, d2_: _Config) -> _Config:
15✔
705
            """Recursive sub-function to find elements in the first dict that are not in the second.
706

707
            :param d1_: The first dict.
708
            :param d2_: The second dict.
709
            :return: A dict with elements on the first dict that are not in the second.
710
            """
711
            for key, value in d1_.copy().items():
15✔
712
                if ignore_underline_keys and key.startswith('_'):
15✔
713
                    d1_.pop(key, None)
15✔
714
                elif isinstance(value, dict) and isinstance(d2_.get(key), dict):
15✔
715
                    _sub_dict_deep_difference(value, d2_[key])
15✔
716
                    if not len(value):
15✔
717
                        d1_.pop(key)
15✔
718
                elif key in d2_:
15✔
719
                    d1_.pop(key)
15✔
720
            return d1_
15✔
721

722
        return _sub_dict_deep_difference(copy.deepcopy(d1), d2)
15✔
723

724
    @staticmethod
15✔
725
    def dict_deep_merge(source: _Config, destination: _Config) -> _Config:
15✔
726
        """Recursively deep merges source dict into destination dict.
727

728
        :param source: The first dict.
729
        :param destination: The second dict.
730
        :return: The deep merged dict.
731
        """
732
        # https://stackoverflow.com/a/20666342
733

734
        def _sub_dict_deep_merge(source_: _Config, destination_: _Config) -> _Config:
15✔
735
            """Recursive sub-function to merges source_ dict into destination_ dict.
736

737
            :param source_: The first dict.
738
            :param destination_: The second dict.
739
            :return: The merged dict.
740
            """
741
            for key, value in source_.items():
15✔
742
                if isinstance(value, dict):
15✔
743
                    # get node or create one
744
                    node = destination_.setdefault(key, {})
15✔
745
                    _sub_dict_deep_merge(value, node)
15✔
746
                else:
747
                    destination_[key] = value
15✔
748

749
            return destination_
15✔
750

751
        return _sub_dict_deep_merge(source, copy.deepcopy(destination))
15✔
752

753
    def check_for_unrecognized_keys(self, config: _Config) -> None:
15✔
754
        """Test if config has keys not in DEFAULT_CONFIG (bad keys, e.g. typos); if so, raise ValueError.
755

756
        :param config: The configuration.
757
        :raises ValueError: If the configuration has keys not in DEFAULT_CONFIG (bad keys, e.g. typos)
758
        """
759
        config_for_extras = copy.deepcopy(config)
15✔
760
        if 'job_defaults' in config_for_extras:
15!
761
            # Create missing 'job_defaults' keys from DEFAULT_CONFIG
762
            for key in DEFAULT_CONFIG['job_defaults']:
15✔
763
                if 'job_defaults' not in config_for_extras:
15!
764
                    config_for_extras['job_defaults'] = {}
×
765
                config_for_extras['job_defaults'][key] = None
15✔
766
            for key in DEFAULT_CONFIG['differ_defaults']:
15✔
767
                if 'differ_defaults' not in config_for_extras:
15!
768
                    config_for_extras['differ_defaults'] = {}
×
769
                config_for_extras['differ_defaults'][key] = None
15✔
770
        if 'hooks' in sys.modules:
15✔
771
            # Remove extra keys in config used in hooks (they are not in DEFAULT_CONFIG)
772
            for _, obj in inspect.getmembers(
15✔
773
                sys.modules['hooks'], lambda x: inspect.isclass(x) and x.__module__ == 'hooks'
774
            ):
775
                if issubclass(obj, JobBase):
15✔
776
                    if (
15!
777
                        obj.__kind__ not in DEFAULT_CONFIG['job_defaults']
778
                        or obj.__kind__ not in DEFAULT_CONFIG['job_defaults']
779
                    ):
780
                        config_for_extras['job_defaults'].pop(obj.__kind__, None)
15✔
781
                elif issubclass(obj, ReporterBase) and obj.__kind__ not in DEFAULT_CONFIG['report']:
15✔
782
                    config_for_extras['report'].pop(obj.__kind__, None)
15✔
783
        if 'slack' in config_for_extras.get('report', {}):
15✔
784
            # Ignore legacy key
785
            config_for_extras['report'].pop('slack')
15✔
786
        extras: _Config = self.dict_deep_difference(config_for_extras, DEFAULT_CONFIG, ignore_underline_keys=True)
15✔
787
        if not extras.get('report'):
15✔
788
            extras.pop('report', None)
15✔
789
        if extras:
15✔
790
            warnings.warn(
15✔
791
                f'Found unrecognized directive(s) in the configuration file {self.filename}:\n'
792
                f'{yaml.safe_dump(extras)}Check for typos or the hooks.py file (if any); documentation is at '
793
                f'{__docs_url__}\n',
794
                RuntimeWarning,
795
                stacklevel=1,
796
            )
797

798
    @staticmethod
15✔
799
    def replace_none_keys(config: _Config) -> None:
15✔
800
        """Fixes None keys in loaded config that should be empty dicts instead."""
801
        if 'job_defaults' not in config:
15!
802
            config['job_defaults'] = DEFAULT_CONFIG['job_defaults']
×
803
        else:
804
            if 'shell' in config['job_defaults']:
15!
805
                if 'command' in config['job_defaults']:
×
806
                    raise KeyError(
×
807
                        "Found both 'shell' and 'command' job_defaults in config, a duplicate. Please remove 'shell' "
808
                        'ones.'
809
                    )
810
                config['job_defaults']['command'] = config['job_defaults'].pop('shell')
×
811
            for key in ('all', 'url', 'browser', 'command'):
15✔
812
                if key not in config['job_defaults'] or config['job_defaults'][key] is None:
15!
UNCOV
813
                    config['job_defaults'][key] = {}
×
814

815
    def load(self, *args: Any) -> None:
15✔
816
        """Load configuration file from self.filename into self.config adding missing keys from DEFAULT_CONFIG.
817

818
        :param args: None used.
819
        """
820
        logger.debug(f'Loading configuration from {self.filename}')
15✔
821
        config: _Config = self.parse(self.filename)
15✔
822

823
        if config:
15✔
824
            self.replace_none_keys(config)
15✔
825
            self.check_for_unrecognized_keys(config)
15✔
826

827
            # If config is missing keys in DEFAULT_CONFIG, log the missing keys and deep merge DEFAULT_CONFIG
828
            missing = self.dict_deep_difference(DEFAULT_CONFIG, config, ignore_underline_keys=True)
15✔
829
            if missing:
15!
830
                logger.info(
15✔
831
                    f'The configuration file {self.filename} is missing directive(s); using default value for those. '
832
                    'Run with -vv for more detalis.'
833
                )
834
                logger.debug(
15✔
835
                    f'The following default values are being used:\n'
836
                    f'{yaml.safe_dump(missing)}'
837
                    f'See documentation at {__docs_url__}en/stable/configuration.html'
838
                )
839
                config = self.dict_deep_merge(config or {}, DEFAULT_CONFIG)
15✔
840

841
            # format headers
842
            for job_defaults_type in ('all', 'url', 'browser'):
15✔
843
                if 'headers' in config['job_defaults'][job_defaults_type]:
15!
UNCOV
844
                    config['job_defaults'][job_defaults_type]['headers'] = Headers(
×
845
                        {k: str(v) for k, v in config['job_defaults'][job_defaults_type]['headers'].items()},
846
                        encoding='utf-8',
847
                    )
848
                if 'cookies' in config['job_defaults'][job_defaults_type]:
15!
UNCOV
849
                    config['job_defaults'][job_defaults_type]['cookies'] = {
×
850
                        k: str(v) for k, v in config['job_defaults'][job_defaults_type]['cookies'].items()
851
                    }
852
            logger.info(f'Loaded configuration from {self.filename}')
15✔
853

854
        else:
855
            logger.warning(f'No directives found in the configuration file {self.filename}; using default directives.')
15✔
856
            config = DEFAULT_CONFIG
15✔
857

858
        self.config = config
15✔
859

860
    def save(self, *args: Any, **kwargs: Any) -> None:
15✔
861
        """Save self.config into self.filename using YAML.
862

863
        :param args: None used.
864
        :param kwargs: None used.
865
        """
866
        with self.filename.open('w') as fp:
15✔
867
            fp.write(
15✔
868
                f'# {__project_name__} configuration file. See {__docs_url__}en/stable/configuration.html\n'
869
                f'# Originally written on {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}Z by version'
870
                f' {__version__}.\n'
871
                f'\n'
872
            )
873
            yaml.safe_dump(self.config, fp, allow_unicode=True, sort_keys=False)
15✔
874

875
    @classmethod
15✔
876
    def write_default_config(cls, filename: Path) -> None:
15✔
877
        """Write default configuration to file.
878

879
        :param filename: The filename.
880
        """
881
        config_storage = cls(filename)
15✔
882
        config_storage.config = DEFAULT_CONFIG
15✔
883
        config_storage.save()
15✔
884

885

886
class YamlJobsStorage(BaseYamlFileStorage, JobsBaseFileStorage):
15✔
887
    """Class for jobs file (is a YAML textual file)."""
888

889
    @classmethod
15✔
890
    def _parse(cls, fp: TextIO, filenames: list[Path]) -> list[JobBase]:
15✔
891
        """Parse the contents of a jobs YAML file.
892

893
        :param fp: The text stream to parse.
894
        :return: A list of JobBase objects.
895
        :raise yaml.YAMLError: If a YAML error is found in the file.
896
        :raise ValueError: If a duplicate URL/command is found in the list.
897
        """
898

899
        def job_files_for_error() -> list[str]:
15✔
900
            """:return: A list of line containing the names of the job files."""
901
            if len(filenames) > 1:
15!
UNCOV
902
                jobs_files = ['in the concatenation of the jobs files:'] + [f'• {file},' for file in filenames]
×
903
            elif len(filenames) == 1:
15!
904
                jobs_files = [f'in jobs file {filenames[0]}.']
15✔
905
            else:
UNCOV
906
                jobs_files = []
×
907
            return jobs_files
15✔
908

909
        jobs = []
15✔
910
        jobs_by_guid = defaultdict(list)
15✔
911
        try:
15✔
912
            for i, job_data in enumerate((job for job in yaml.safe_load_all(fp) if job)):
15✔
913
                if not isinstance(job_data, dict):
15!
UNCOV
914
                    raise ValueError(
×
915
                        '\n   '.join(
916
                            [
917
                                f'Found invalid job data (consisting of the {type(job_data).__name__} {job_data})',
918
                                *job_files_for_error(),
919
                            ]
920
                        )
921
                    )
922
                job_data['index_number'] = i + 1
15✔
923
                job = JobBase.unserialize(job_data, filenames)
15✔
924
                # TODO: Implement 100% validation and remove it from jobs.py
925
                # TODO: Try using pydantic to do this.
926
                if not isinstance(job.data, (NoneType, str, dict, list)):
15!
UNCOV
927
                    raise ValueError(
×
928
                        '\n   '.join(
929
                            [
930
                                f"The 'data' key needs to contain a string, a dictionary or a list; found a"
931
                                f' {type(job.data).__name__} ',
932
                                f'in {job.get_indexed_location()}',
933
                                *job_files_for_error(),
934
                            ]
935
                        )
936
                    )
937
                if not isinstance(job.filters, (NoneType, list)):
15!
UNCOV
938
                    raise ValueError(
×
939
                        '\n   '.join(
940
                            [
941
                                f"The 'filter' key needs to contain a list; found a {type(job.filters).__name__} ",
942
                                f'in {job.get_indexed_location()}',
943
                                *job_files_for_error(),
944
                            ]
945
                        )
946
                    )
947
                if not isinstance(job.headers, (NoneType, dict, Headers)):
15!
UNCOV
948
                    raise ValueError(
×
949
                        '\n   '.join(
950
                            [
951
                                f"The 'headers' key needs to contain a dictionary; found a "
952
                                f'{type(job.headers).__name__} ',
953
                                f'in {job.get_indexed_location()})',
954
                                *job_files_for_error(),
955
                            ]
956
                        )
957
                    )
958
                if not isinstance(job.cookies, (NoneType, dict)):
15!
UNCOV
959
                    raise ValueError(
×
960
                        '\n   '.join(
961
                            [
962
                                f"The 'cookies' key needs to contain a dictionary; found a "
963
                                f'{type(job.headers).__name__} ',
964
                                f'in {job.get_indexed_location()})',
965
                                *job_files_for_error(),
966
                            ]
967
                        )
968
                    )
969
                if not isinstance(job.switches, (NoneType, str, list)):
15!
UNCOV
970
                    raise ValueError(
×
971
                        '\n   '.join(
972
                            [
973
                                f"The 'switches' key needs to contain a string or a list; found a "
974
                                f'{type(job.switches).__name__} ',
975
                                f'in {job.get_indexed_location()}',
976
                                *job_files_for_error(),
977
                            ]
978
                        )
979
                    )
980
                # We add GUID here to speed things up and to allow hooks to programmatically change job.url and/or
981
                # job.user_visible_url
982
                jobs.append(job)
15✔
983
                jobs_by_guid[job.guid].append(job)
15✔
UNCOV
984
        except yaml.scanner.ScannerError as e:
×
UNCOV
985
            raise ValueError(
×
986
                '\n   '.join(
987
                    [
988
                        f'YAML parser {e.args[2].replace("here", "")} in line {e.args[3].line + 1}, column'
989
                        f' {e.args[3].column + 1}',
990
                        *job_files_for_error(),
991
                    ]
992
                )
993
            ) from None
994

995
        conflicting_jobs = [guid_jobs[0].get_location() for _, guid_jobs in jobs_by_guid.items() if len(guid_jobs) != 1]
15✔
996
        if conflicting_jobs:
15✔
997
            raise ValueError(
15✔
998
                '\n   '.join(
999
                    ['Each job must have a unique URL/command (for URLs, append #1, #2, etc. to make them unique):']
1000
                    + [f'• {job}' for job in conflicting_jobs]
1001
                    + ['']
1002
                    + job_files_for_error()
1003
                )
1004
            ) from None
1005

1006
        return jobs
15✔
1007

1008
    @classmethod
15✔
1009
    def parse(cls, filename: Path) -> list[JobBase]:
15✔
1010
        """Parse the contents of a jobs YAML file and return a list of jobs.
1011

1012
        :param filename: The filename Path.
1013
        :return: A list of JobBase objects.
1014
        """
1015
        if filename is not None and filename.is_file():
15!
1016
            with filename.open() as fp:
15✔
1017
                return cls._parse(fp, [filename])
15✔
UNCOV
1018
        return []
×
1019

1020
    def load(self, *args: Any) -> list[JobBase]:
15✔
1021
        """Parse the contents of the jobs YAML file(s) and return a list of jobs.
1022

1023
        :return: A list of JobBase objects.
1024
        """
1025
        if len(self.filename) == 1:
15!
1026
            with self.filename[0].open() as f:
15✔
1027
                return self._parse(f, self.filename)
15✔
1028
        else:
UNCOV
1029
            fp = io.StringIO('\n---\n'.join(f.read_text(encoding='utf-8-sig') for f in self.filename if f.is_file()))
×
UNCOV
1030
            return self._parse(fp, self.filename)
×
1031

1032
    def save(self, jobs: Iterable[JobBase]) -> None:
15✔
1033
        """Save jobs to the job YAML file.
1034

1035
        :param jobs: An iterable of JobBase objects to be written.
1036
        """
1037
        print(f'Saving updated list to {self.filename[0]}.')
15✔
1038

1039
        with self.filename[0].open('w') as fp:
15✔
1040
            yaml.safe_dump_all([job.serialize() for job in jobs], fp, allow_unicode=True, sort_keys=False)
15✔
1041

1042

1043
class SsdbStorage(BaseFileStorage, ABC):
15✔
1044
    """Base class for snapshots storage."""
1045

1046
    @abstractmethod
15✔
1047
    def close(self) -> None:
15✔
1048
        pass
15✔
1049

1050
    @abstractmethod
15✔
1051
    def get_guids(self) -> list[str]:
15✔
1052
        pass
15✔
1053

1054
    @abstractmethod
15✔
1055
    def load(self, guid: str) -> Snapshot:
15✔
1056
        pass
15✔
1057

1058
    @abstractmethod
15✔
1059
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
15✔
1060
        pass
15✔
1061

1062
    @abstractmethod
15✔
1063
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
15✔
UNCOV
1064
        pass
×
1065

1066
    @abstractmethod
15✔
1067
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
15✔
1068
        pass
15✔
1069

1070
    @abstractmethod
15✔
1071
    def delete(self, guid: str) -> None:
15✔
1072
        pass
15✔
1073

1074
    @abstractmethod
15✔
1075
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
15✔
1076
        """For the given 'guid', delete only the latest 'delete_entries' entries and keep all other (older) ones.
1077

1078
        :param guid: The guid.
1079
        :param delete_entries: The number of most recent entries to delete.
1080

1081
        :returns: Number of records deleted.
1082
        """
1083

1084
    @abstractmethod
15✔
1085
    def delete_all(self) -> int:
15✔
1086
        """Delete all entries; used for testing only.
1087

1088
        :returns: Number of records deleted.
1089
        """
1090

1091
    @abstractmethod
15✔
1092
    def clean(self, guid: str, keep_entries: int = 1) -> int:
15✔
1093
        """Removes the entries for guid except the latest n keep_entries.
1094

1095
        :param guid: The guid.
1096
        :param keep_entries: The number of most recent entries to keep.
1097

1098
        :returns: Number of records deleted.
1099
        """
1100

1101
    @abstractmethod
15✔
1102
    def move(self, guid: str, new_guid: str) -> int:
15✔
1103
        """Replace uuid in records matching the 'guid' with the 'new_guid' value.
1104

1105
        If there are existing records with 'new_guid', they will not be overwritten and the job histories will be
1106
        merged.
1107

1108
        :returns: Number of records searched for replacement.
1109
        """
1110

1111
    @abstractmethod
15✔
1112
    def rollback(self, timestamp: float) -> int | None:
15✔
1113
        """Rolls back the database to timestamp.
1114

1115
        :param timestamp: The timestamp.
1116

1117
        :returns: Number of records deleted.
1118
        :raises: NotImplementedError for those classes where this method is not implemented.
1119
        """
1120

1121
    def backup(self) -> Iterator[tuple[str, str | bytes, float, int, str, str, ErrorData]]:
15✔
1122
        """Return the most recent entry for each 'guid'.
1123

1124
        :returns: A generator of tuples, each consisting of (guid, data, timestamp, tries, etag, mime_type)
1125
        """
1126
        for guid in self.get_guids():
15✔
1127
            data, timestamp, tries, etag, mime_type, error_data = self.load(guid)
15✔
1128
            yield guid, data, timestamp, tries, etag, mime_type, error_data
15✔
1129

1130
    def restore(self, entries: Iterable[tuple[str, str | bytes, float, int, str, str, ErrorData]]) -> None:
15✔
1131
        """Save multiple entries into the database.
1132

1133
        :param entries: An iterator of tuples WHERE each consists of (guid, data, timestamp, tries, etag, mime_type)
1134
        """
1135
        for guid, data, timestamp, tries, etag, mime_type, error_data in entries:
15✔
1136
            new_snapshot = Snapshot(
15✔
1137
                data=data, timestamp=timestamp, tries=tries, etag=etag, mime_type=mime_type, error_data=error_data
1138
            )
1139
            self.save(guid=guid, snapshot=new_snapshot, temporary=False)
15✔
1140

1141
    def gc(self, known_guids: Iterable[str], keep_entries: int = 1) -> None:
15✔
1142
        """Garbage collect the database: delete all guids not included in known_guids and keep only last n snapshot for
1143
        the others.
1144

1145
        :param known_guids: The guids to keep.
1146
        :param keep_entries: Number of entries to keep after deletion for the guids to keep.
1147
        """
1148
        for guid in set(self.get_guids()) - set(known_guids):
15✔
1149
            print(f'Deleting job {guid} (no longer being tracked).')
15✔
1150
            self.delete(guid)
15✔
1151
        self.clean_ssdb(known_guids, keep_entries)
15✔
1152

1153
    def clean_ssdb(self, known_guids: Iterable[str], keep_entries: int = 1) -> None:
15✔
1154
        """Convenience function to clean the cache.
1155

1156
        If self.clean_all is present, runs clean_all(). Otherwise, runs clean() on all known_guids, one at a time.
1157
        Prints the number of snapshots deleted.
1158

1159
        :param known_guids: An iterable of guids
1160
        :param keep_entries: Number of entries to keep after deletion.
1161
        """
1162
        if hasattr(self, 'clean_all'):
15✔
1163
            count = self.clean_all(keep_entries)  # ty:ignore[call-non-callable]
15✔
1164
            if count:
15✔
1165
                print(f'Deleted {count} old snapshots.')
15✔
1166
        else:
1167
            for guid in known_guids:
15✔
1168
                count = self.clean(guid, keep_entries)
15✔
1169
                if count:
15✔
1170
                    print(f'Deleted {count} old snapshots of {guid}.')
15✔
1171

1172
    @abstractmethod
15✔
1173
    def flushdb(self) -> None:
15✔
1174
        """Delete all entries of the database.  Use with care, there is no undo!"""
1175

1176

1177
class SsdbDirStorage(SsdbStorage):
15✔
1178
    """Class for snapshots stored as individual textual files in a directory 'dirname'."""
1179

1180
    def __init__(self, dirname: str | Path) -> None:
15✔
1181
        super().__init__(dirname)
15✔
1182
        self.filename.mkdir(parents=True, exist_ok=True)  # using the attr filename because it is a Path (confusing!)
15✔
1183
        logger.info(f'Using directory {self.filename} to store snapshot data as individual text files')
15✔
1184

1185
    def close(self) -> None:
15✔
1186
        # Nothing to close
UNCOV
1187
        return
×
1188

1189
    def _get_filename(self, guid: str) -> Path:
15✔
1190
        return self.filename.joinpath(guid)  # filename is a dir (confusing!)
15✔
1191

1192
    def get_guids(self) -> list[str]:
15✔
1193
        return [filename.name for filename in self.filename.iterdir()]
15✔
1194

1195
    def load(self, guid: str) -> Snapshot:
15✔
1196
        filename = self._get_filename(guid)
15✔
1197
        if not filename.is_file():
15✔
1198
            return Snapshot('', 0, 0, '', '', {})
15✔
1199

1200
        try:
15✔
1201
            data = filename.read_text()
15✔
UNCOV
1202
        except UnicodeDecodeError:
×
UNCOV
1203
            data = filename.read_text(errors='ignore')
×
UNCOV
1204
            logger.warning(f'Found and ignored Unicode-related errors when retrieving saved snapshot {guid}')
×
1205

1206
        timestamp = filename.stat().st_mtime
15✔
1207

1208
        return Snapshot(data, timestamp, 0, '', '', {})
15✔
1209

1210
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
15✔
1211
        if count is not None and count < 1:
15✔
1212
            return {}
15✔
1213
        snapshot = self.load(guid)
15✔
1214
        return {snapshot.data: snapshot.timestamp} if snapshot.data and snapshot.timestamp else {}
15✔
1215

1216
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
15✔
1217
        if count is not None and count < 1:
15✔
1218
            return []
15✔
1219
        snapshot = self.load(guid)
15✔
1220
        return [snapshot] if snapshot.data and snapshot.timestamp else []
15✔
1221

1222
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
15✔
1223
        # ETag and mime_type are ignored
1224
        filename = self._get_filename(guid)
15✔
1225
        with filename.open('w+') as fp:
15✔
1226
            fp.write(str(snapshot.data))
15✔
1227
        os.utime(filename, times=(datetime.now().timestamp(), snapshot.timestamp))  # noqa: DTZ005
15✔
1228

1229
    def delete(self, guid: str) -> None:
15✔
1230
        filename = self._get_filename(guid)
15✔
1231
        filename.unlink(missing_ok=True)
15✔
1232

1233
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
15✔
1234
        """For the given 'guid', delete the latest entry and keep all other (older) ones.
1235

1236
        :param guid: The guid.
1237
        :param delete_entries: The number of most recent entries to delete.
1238

1239
        :raises NotImplementedError: This function is not implemented for 'textfiles' databases.
1240
        """
1241
        raise NotImplementedError(
1242
            "Deleting of latest snapshot not supported by 'textfiles' database engine since only one snapshot is "
1243
            "saved. Delete all snapshots if that's what you are trying to do."
1244
        )
1245

1246
    def delete_all(self) -> int:
15✔
1247
        """Delete all entries; used for testing only.
1248

1249
        :raises NotImplementedError: This function is not implemented for 'textfiles' databases.
1250
        """
1251
        raise NotImplementedError(
1252
            "Deleting of latest snapshot not supported by 'textfiles' database engine since only one snapshot is "
1253
            "saved. Delete all snapshots if that's what you are trying to do."
1254
        )
1255

1256
    def clean(self, guid: str, keep_entries: int = 1) -> int:
15✔
1257
        if keep_entries != 1:
15✔
1258
            raise NotImplementedError('Only keeping latest 1 entry is supported.')
1259
        # We only store the latest version, no need to clean
1260
        return 0
15✔
1261

1262
    def move(self, guid: str, new_guid: str) -> int:
15✔
1263
        """Moves the data from guid to new_guid.
1264

1265
        :param guid: The guid.
1266
        :param new_guid: The new guid.
1267

1268
        :returns: Number of records moved.
1269
        """
1270
        if guid == new_guid:
15!
UNCOV
1271
            return 0
×
1272
        old_filepath = Path(self._get_filename(guid))
15✔
1273
        new_filepath = Path(self._get_filename(new_guid))
15✔
1274
        if old_filepath.exists():
15!
1275
            new_filepath.parent.mkdir(parents=True, exist_ok=True)
15✔
1276
            old_filepath.rename(new_filepath)
15✔
1277
        else:
UNCOV
1278
            raise ValueError(f'Old snapshot file {old_filepath} does not exist')
×
1279
        return 1
15✔
1280

1281
    def rollback(self, timestamp: float) -> None:
15✔
1282
        raise NotImplementedError("'textfiles' databases cannot be rolled back as new snapshots overwrite old ones")
1283

1284
    def flushdb(self) -> None:
15✔
1285
        for file in self.filename.iterdir():
15✔
1286
            if file.is_file():
15!
1287
                file.unlink()
15✔
1288

1289

1290
class SsdbSQLite3Storage(SsdbStorage):
15✔
1291
    """Handles storage of the snapshot as a SQLite database in the 'filename' file using Python's built-in sqlite3
1292
    module and the msgpack package.
1293

1294
    A temporary database is created by __init__ and will be written by the 'save()' function (unless temporary=False).
1295
    This data will be written to the permanent one by the 'close()' function, which is called at the end of program
1296
    execution.
1297

1298
    The database contains the 'webchanges' table with the following columns:
1299

1300
    * guid: unique hash of the "location", i.e. the URL/command; indexed
1301
    * timestamp: the Unix timestamp of when then the snapshot was taken; indexed
1302
    * msgpack_data: a msgpack blob containing 'data', 'tries', 'etag' and 'mime_type' in a dict of keys 'd', 't',
1303
      'e' and 'm'
1304
    """
1305

1306
    def __init__(self, filename: Path, max_snapshots: int = 4) -> None:
15✔
1307
        """:param filename: The full filename of the database file
1308
        :param max_snapshots: The maximum number of snapshots to retain in the database for each 'guid'
1309
        """
1310
        # Opens the database file and, if new, creates a table and index.
1311

1312
        self.max_snapshots = max_snapshots
15✔
1313

1314
        logger.debug(f'Run-time SQLite library: {sqlite3.sqlite_version}')
15✔
1315
        super().__init__(filename)
15✔
1316

1317
        self.filename.parent.mkdir(parents=True, exist_ok=True)
15✔
1318

1319
        # https://stackoverflow.com/questions/26629080
1320
        self.lock = threading.RLock()
15✔
1321

1322
        self.db = sqlite3.connect(filename, check_same_thread=False)
15✔
1323
        logger.info(f'Using sqlite3 {sqlite3.sqlite_version} database at {filename}')
15✔
1324
        self.cur = self.db.cursor()
15✔
1325
        self.cur.execute('PRAGMA temp_store = MEMORY;')
15✔
1326
        tables = self._execute("SELECT name FROM sqlite_master WHERE type='table';").fetchone()
15✔
1327

1328
        def _initialize_table() -> None:
15✔
1329
            logger.debug('Initializing sqlite3 database')
15✔
1330
            self._execute('CREATE TABLE webchanges (uuid TEXT, timestamp REAL, msgpack_data BLOB)')
15✔
1331
            self._execute('CREATE INDEX idx_uuid_time ON webchanges(uuid, timestamp)')
15✔
1332
            self.db.commit()
15✔
1333

1334
        if tables == ('CacheEntry',):
15✔
1335
            logger.info("Found legacy 'minidb' database to convert")
15✔
1336

1337
            # Found a minidb legacy database; close it, rename it for migration and create new sqlite3 one
1338
            import importlib.util
15✔
1339

1340
            if importlib.util.find_spec('minidb') is None:
15!
UNCOV
1341
                print('You have an old snapshot database format that needs to be converted to a current one.')
×
UNCOV
1342
                print(
×
1343
                    f"Please install the Python package 'minidb' for this one-time conversion and rerun "
1344
                    f'{__project_name__}.'
1345
                )
UNCOV
1346
                print('Use e.g. `pip install -U minidb`.')
×
UNCOV
1347
                print()
×
UNCOV
1348
                print("After the conversion, you can uninstall 'minidb' with e.g. `pip uninstall minidb`.")
×
1349
                sys.exit(1)
×
1350

1351
            print('Performing one-time conversion from old snapshot database format.')
15✔
1352
            self.db.close()
15✔
1353
            minidb_filename = filename.with_stem(filename.stem + '_minidb')
15✔
1354
            self.filename.replace(minidb_filename)
15✔
1355
            self.db = sqlite3.connect(filename, check_same_thread=False)
15✔
1356
            self.cur = self.db.cursor()
15✔
1357
            _initialize_table()
15✔
1358
            # Migrate the minidb legacy database renamed above
1359
            self.migrate_from_minidb(minidb_filename)
15✔
1360
        elif tables != ('webchanges',):
15!
1361
            _initialize_table()
15✔
1362

1363
        # Create temporary database in memory for writing during execution (fault tolerance)
1364
        logger.debug('Creating temp sqlite3 database file in memory')
15✔
1365
        self.temp_lock = threading.RLock()
15✔
1366
        self.temp_db = sqlite3.connect('', check_same_thread=False)
15✔
1367
        self.temp_cur = self.temp_db.cursor()
15✔
1368
        self._temp_execute('CREATE TABLE webchanges (uuid TEXT, timestamp REAL, msgpack_data BLOB)')
15✔
1369
        self.temp_db.commit()
15✔
1370

1371
    def _execute(self, sql: str, args: tuple | None = None) -> sqlite3.Cursor:
15✔
1372
        """Execute SQL command on main database"""
1373
        if args is None:
15✔
1374
            logger.debug(f"Executing (perm) '{sql}'")
15✔
1375
            return self.cur.execute(sql)
15✔
1376
        logger.debug(f"Executing (perm) '{sql}' with {args}")
15✔
1377
        return self.cur.execute(sql, args)
15✔
1378

1379
    def _temp_execute(self, sql: str, args: tuple | None = None) -> sqlite3.Cursor:
15✔
1380
        """Execute SQL command on temp database."""
1381
        if args is None:
15✔
1382
            logger.debug(f"Executing (temp) '{sql}'")
15✔
1383
            return self.temp_cur.execute(sql)
15✔
1384
        logger.debug(f"Executing (temp) '{sql}' with {args[:2]}...")
15✔
1385
        return self.temp_cur.execute(sql, args)
15✔
1386

1387
    def _copy_temp_to_permanent(self, delete: bool = False) -> None:
15✔
1388
        """Copy contents of temporary database to permanent one.
1389

1390
        :param delete: also delete contents of temporary cache (used for testing)
1391
        """
1392
        logger.debug('Saving new snapshots to permanent sqlite3 database')
15✔
1393
        # with self.temp_lock:
1394
        #     self.temp_db.commit()
1395
        # with self.lock:
1396
        #     self._execute('ATTACH DATABASE ? AS temp_db', (str(self.temp_filename),))
1397
        #     self._execute('INSERT INTO webchanges SELECT * FROM temp_db.webchanges')
1398
        #     logger.debug(f'Wrote {self.cur.rowcount} new snapshots to permanent sqlite3 database')
1399
        #     self.db.commit()
1400
        #     self._execute('DETACH DATABASE temp_db')
1401
        with self.temp_lock:
15✔
1402
            with self.lock:
15✔
1403
                for row in self._temp_execute('SELECT * FROM webchanges').fetchall():
15✔
1404
                    self._execute('INSERT INTO webchanges VALUES (?, ?, ?)', row)
15✔
1405
                self.db.commit()
15✔
1406
            if delete:
15✔
1407
                self._temp_execute('DELETE FROM webchanges')
15✔
1408

1409
    def close(self) -> None:
15✔
1410
        """Writes the temporary database to the permanent one, purges old entries if required, and closes all database
1411
        connections.
1412
        """
1413
        self._copy_temp_to_permanent()
15✔
1414
        with self.temp_lock:
15✔
1415
            self.temp_db.close()
15✔
1416
            logger.debug('Cleaning up the permanent sqlite3 database and closing the connection')
15✔
1417
        with self.lock:
15✔
1418
            if self.max_snapshots:
15✔
1419
                num_del = self.keep_latest(self.max_snapshots)
15✔
1420
                logger.debug(
15✔
1421
                    f'Keeping no more than {self.max_snapshots} snapshots per job: purged {num_del} older entries'
1422
                )
1423
            else:
1424
                self.db.commit()
15✔
1425
            self.db.close()
15✔
1426
            logger.info(f'Closed main sqlite3 database file {self.filename}')
15✔
1427
        del self.temp_cur
15✔
1428
        del self.temp_db
15✔
1429
        del self.temp_lock
15✔
1430
        del self.cur
15✔
1431
        del self.db
15✔
1432
        del self.lock
15✔
1433

1434
    def get_guids(self) -> list[str]:
15✔
1435
        """Lists the unique 'guid's contained in the database.
1436

1437
        :returns: A list of guids.
1438
        """
1439
        with self.lock:
15✔
1440
            self.cur.row_factory = lambda cursor, row: row[0]
15✔
1441
            guids = self._execute('SELECT DISTINCT uuid FROM webchanges').fetchall()
15✔
1442
            self.cur.row_factory = None
15✔
1443
        return guids
15✔
1444

1445
    def load(self, guid: str) -> Snapshot:
15✔
1446
        """Return the most recent entry matching a 'guid'.
1447

1448
        :param guid: The guid.
1449

1450
        :returns: A tuple (data, timestamp, tries, etag)
1451
            WHERE
1452

1453
            - data is the data;
1454
            - timestamp is the timestamp;
1455
            - tries is the number of tries;
1456
            - etag is the ETag.
1457
        """
1458
        with self.lock:
15✔
1459
            row = self._execute(
15✔
1460
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC LIMIT 1',
1461
                (guid,),
1462
            ).fetchone()
1463
        if row:
15✔
1464
            msgpack_data, timestamp = row
15✔
1465
            r = msgpack.unpackb(msgpack_data)
15✔
1466
            return Snapshot(r['d'], timestamp, r['t'], r['e'], r.get('m', ''), r.get('err', {}))
15✔
1467

1468
        return Snapshot('', 0, 0, '', '', {})
15✔
1469

1470
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
15✔
1471
        """Return max 'count' (None = all) records of data and timestamp of **successful** runs for a 'guid'.
1472

1473
        :param guid: The guid.
1474
        :param count: The maximum number of entries to return; if None return all.
1475

1476
        :returns: A dict (key: value)
1477
            WHERE
1478

1479
            - key is the snapshot data;
1480
            - value is the most recent timestamp for such snapshot.
1481
        """
1482
        if count is not None and count < 1:
15✔
1483
            return {}
15✔
1484

1485
        with self.lock:
15✔
1486
            rows = self._execute(
15✔
1487
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC', (guid,)
1488
            ).fetchall()
1489
        history = {}
15✔
1490
        if rows:
15✔
1491
            for msgpack_data, timestamp in rows:
15✔
1492
                r = msgpack.unpackb(msgpack_data)
15✔
1493
                if not r['t'] and r['d'] not in history:
15!
1494
                    history[r['d']] = timestamp
15✔
1495
                    if count is not None and len(history) >= count:
15!
UNCOV
1496
                        break
×
1497
        return history
15✔
1498

1499
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
15✔
1500
        """Return max 'count' (None = all) entries of all data (including from error runs) saved for a 'guid'.
1501

1502
        :param guid: The guid.
1503
        :param count: The maximum number of entries to return; if None return all.
1504

1505
        :returns: A list of Snapshot tuples (data, timestamp, tries, etag).
1506
            WHERE the values are:
1507

1508
            - data: The data (str, could be empty);
1509
            - timestamp: The timestamp (float);
1510
            - tries: The number of tries (int);
1511
            - etag: The ETag (str, could be empty).
1512
        """
1513
        if count is not None and count < 1:
15✔
1514
            return []
15✔
1515

1516
        with self.lock:
15✔
1517
            rows = self._execute(
15✔
1518
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC', (guid,)
1519
            ).fetchall()
1520
        history: list[Snapshot] = []
15✔
1521
        if rows:
15✔
1522
            for msgpack_data, timestamp in rows:
15✔
1523
                r = msgpack.unpackb(msgpack_data)
15✔
1524
                history.append(Snapshot(r['d'], timestamp, r['t'], r['e'], r.get('m', ''), r.get('err', {})))
15✔
1525
                if count is not None and len(history) >= count:
15✔
1526
                    break
6✔
1527
        return history
15✔
1528

1529
    def save(
15✔
1530
        self,
1531
        *args: Any,
1532
        guid: str,
1533
        snapshot: Snapshot,
1534
        temporary: bool | None = True,
1535
        **kwargs: Any,
1536
    ) -> None:
1537
        """Save the data from a job.
1538

1539
        By default, it is saved into the temporary database. Call close() to transfer the contents of the temporary
1540
        database to the permanent one.
1541

1542
        Note: the logic is such that any attempts that end in an exception will have tries >= 1, and we replace the data
1543
        with the one from the most recent successful attempt.
1544

1545
        :param guid: The guid.
1546
        :param data: The data.
1547
        :param timestamp: The timestamp.
1548
        :param tries: The number of tries.
1549
        :param etag: The ETag (could be empty string).
1550
        :param temporary: If true, saved to temporary database (default).
1551
        """
1552
        c = {
15✔
1553
            'd': snapshot.data,
1554
            't': snapshot.tries,
1555
            'e': snapshot.etag,
1556
            'm': snapshot.mime_type,
1557
            'err': snapshot.error_data,
1558
        }
1559
        msgpack_data = msgpack.packb(c)
15✔
1560
        if temporary:
15✔
1561
            with self.temp_lock:
15✔
1562
                self._temp_execute('INSERT INTO webchanges VALUES (?, ?, ?)', (guid, snapshot.timestamp, msgpack_data))
15✔
1563
                # we do not commit to temporary as it's being used as write-only (we commit at the end)
1564
        else:
1565
            with self.lock:
15✔
1566
                self._execute('INSERT INTO webchanges VALUES (?, ?, ?)', (guid, snapshot.timestamp, msgpack_data))
15✔
1567
                self.db.commit()
15✔
1568

1569
    def delete(self, guid: str) -> None:
15✔
1570
        """Delete all entries matching a 'guid'.
1571

1572
        :param guid: The guid.
1573
        """
1574
        with self.lock:
15✔
1575
            self._execute('DELETE FROM webchanges WHERE uuid = ?', (guid,))
15✔
1576
            self.db.commit()
15✔
1577

1578
    def delete_latest(
15✔
1579
        self,
1580
        guid: str,
1581
        delete_entries: int = 1,
1582
        temporary: bool | None = False,
1583
        **kwargs: Any,
1584
    ) -> int:
1585
        """For the given 'guid', delete the latest 'delete_entries' number of entries and keep all other (older) ones.
1586

1587
        :param guid: The guid.
1588
        :param delete_entries: The number of most recent entries to delete.
1589
        :param temporary: If False, deleted from permanent database (default).
1590

1591
        :returns: Number of records deleted.
1592
        """
1593
        if temporary:
15✔
1594
            with self.temp_lock:
15✔
1595
                self._temp_execute(
15✔
1596
                    'DELETE FROM webchanges '
1597
                    'WHERE ROWID IN ( '
1598
                    '    SELECT ROWID FROM webchanges '
1599
                    '    WHERE uuid = ? '
1600
                    '    ORDER BY timestamp DESC '
1601
                    '    LIMIT ? '
1602
                    ')',
1603
                    (guid, delete_entries),
1604
                )
1605
                num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1606
        else:
1607
            with self.lock:
15✔
1608
                self._execute(
15✔
1609
                    'DELETE FROM webchanges '
1610
                    'WHERE ROWID IN ( '
1611
                    '    SELECT ROWID FROM webchanges '
1612
                    '    WHERE uuid = ? '
1613
                    '    ORDER BY timestamp DESC '
1614
                    '    LIMIT ? '
1615
                    ')',
1616
                    (guid, delete_entries),
1617
                )
1618
                num_del = self._execute('SELECT changes()').fetchone()[0]
15✔
1619
                self.db.commit()
15✔
1620
        return num_del
15✔
1621

1622
    def delete_all(self) -> int:
15✔
1623
        """Delete all entries; used for testing only.
1624

1625
        :returns: Number of records deleted.
1626
        """
1627
        with self.lock:
15✔
1628
            self._execute('DELETE FROM webchanges')
15✔
1629
            self.db.commit()
15✔
1630
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1631
            self.db.commit()
15✔
1632

1633
        return num_del
15✔
1634

1635
    def clean(self, guid: str, keep_entries: int = 1) -> int:
15✔
1636
        """For the given 'guid', keep only the latest 'keep_entries' number of entries and delete all other (older)
1637
        ones. To delete older entries from all guids, use clean_all() instead.
1638

1639
        :param guid: The guid.
1640
        :param keep_entries: Number of entries to keep after deletion.
1641

1642
        :returns: Number of records deleted.
1643
        """
1644
        with self.lock:
15✔
1645
            self._execute(
15✔
1646
                'DELETE FROM webchanges '
1647
                'WHERE ROWID IN ( '
1648
                '    SELECT ROWID FROM webchanges '
1649
                '    WHERE uuid = ? '
1650
                '    ORDER BY timestamp DESC '
1651
                '    LIMIT -1 '
1652
                '    OFFSET ? '
1653
                ') ',
1654
                (guid, keep_entries),
1655
            )
1656
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1657
            self.db.commit()
15✔
1658
            self._execute('VACUUM')
15✔
1659
        return num_del
15✔
1660

1661
    def move(self, guid: str, new_guid: str) -> int:
15✔
1662
        """Replace uuid in records matching the 'guid' with the 'new_guid' value.
1663

1664
        If there are existing records with 'new_guid', they will not be overwritten and the job histories will be
1665
        merged.
1666

1667
        :returns: Number of records searched for replacement.
1668
        """
1669
        total_searched = 0
15✔
1670
        if guid != new_guid:
15!
1671
            with self.lock:
15✔
1672
                self._execute(
15✔
1673
                    'UPDATE webchanges SET uuid = REPLACE(uuid, ?, ?)',
1674
                    (guid, new_guid),
1675
                )
1676
                total_searched = self._execute('SELECT changes()').fetchone()[0]
15✔
1677
                self.db.commit()
15✔
1678
                self._execute('VACUUM')
15✔
1679

1680
        return total_searched
15✔
1681

1682
    def clean_all(self, keep_entries: int = 1) -> int:
15✔
1683
        """Delete all older entries for each 'guid' (keep only keep_entries).
1684

1685
        :returns: Number of records deleted.
1686
        """
1687
        with self.lock:
15✔
1688
            if keep_entries == 1:
15✔
1689
                self._execute(
15✔
1690
                    'DELETE FROM webchanges '
1691
                    'WHERE EXISTS ( '
1692
                    '    SELECT 1 FROM webchanges '
1693
                    '    w WHERE w.uuid = webchanges.uuid AND w.timestamp > webchanges.timestamp '
1694
                    ')'
1695
                )
1696
            else:
1697
                self._execute(
15✔
1698
                    'DELETE FROM webchanges '
1699
                    'WHERE ROWID IN ( '
1700
                    '    WITH rank_added AS ('
1701
                    '        SELECT '
1702
                    '             ROWID,'
1703
                    '             uuid,'
1704
                    '             timestamp, '
1705
                    '             ROW_NUMBER() OVER (PARTITION BY uuid ORDER BY timestamp DESC) AS rn'
1706
                    '        FROM webchanges '
1707
                    '    ) '
1708
                    '    SELECT ROWID FROM rank_added '
1709
                    '    WHERE rn > ?'
1710
                    ')',
1711
                    (keep_entries,),
1712
                )
1713
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1714
            self.db.commit()
15✔
1715
            self._execute('VACUUM')
15✔
1716
        return num_del
15✔
1717

1718
    def keep_latest(self, keep_entries: int = 1) -> int:
15✔
1719
        """Delete all older entries keeping only the 'keep_num' per guid.
1720

1721
        :param keep_entries: Number of entries to keep after deletion.
1722

1723
        :returns: Number of records deleted.
1724
        """
1725
        with self.lock:
15✔
1726
            self._execute(
15✔
1727
                'WITH '
1728
                'cte AS ( SELECT uuid, timestamp, ROW_NUMBER() OVER ( PARTITION BY uuid '
1729
                '                                                     ORDER BY timestamp DESC ) rn '
1730
                '         FROM webchanges ) '
1731
                'DELETE '
1732
                'FROM webchanges '
1733
                'WHERE EXISTS ( SELECT 1 '
1734
                '               FROM cte '
1735
                '               WHERE webchanges.uuid = cte.uuid '
1736
                '                 AND webchanges.timestamp = cte.timestamp '
1737
                '                 AND cte.rn > ? );',
1738
                (keep_entries,),
1739
            )
1740
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1741
            self.db.commit()
15✔
1742
        return num_del
15✔
1743

1744
    def rollback(self, timestamp: float, count: bool = False) -> int:
15✔
1745
        """Rollback database to the entries present at timestamp.
1746

1747
        :param timestamp: The timestamp.
1748
        :param count: If set to true, only count the number that would be deleted without doing so.
1749

1750
        :returns: Number of records deleted (or to be deleted).
1751
        """
1752
        command = 'SELECT COUNT(*)' if count else 'DELETE'
15✔
1753
        with self.lock:
15✔
1754
            self._execute(
15✔
1755
                f'{command} '  # noqa: S608 Possible SQL injection
1756
                'FROM webchanges '
1757
                'WHERE EXISTS ( '
1758
                '     SELECT 1 '
1759
                '     FROM webchanges AS w '
1760
                '     WHERE w.uuid = webchanges.uuid '
1761
                '     AND webchanges.timestamp > ? '
1762
                '     AND w.timestamp > ? '
1763
                ')',
1764
                (timestamp, timestamp),
1765
            )
1766
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1767
            self.db.commit()
15✔
1768
        return num_del
15✔
1769

1770
    def migrate_from_minidb(self, minidb_filename: str | Path) -> None:
15✔
1771
        """Migrate the data of a legacy minidb database to the current database.
1772

1773
        :param minidb_filename: The filename of the legacy minidb database.
1774
        """
1775
        print("Found 'minidb' database and upgrading it to the new engine (note: only the last snapshot is retained).")
15✔
1776
        logger.info(
15✔
1777
            "Found legacy 'minidb' database and converting it to 'sqlite3' and new schema. Package 'minidb' needs to be"
1778
            ' installed for the conversion.'
1779
        )
1780

1781
        from webchanges.storage_minidb import SsdbMiniDBStorage
15✔
1782

1783
        legacy_db = SsdbMiniDBStorage(minidb_filename)
15✔
1784
        self.restore(legacy_db.backup())
15✔
1785
        legacy_db.close()
15✔
1786
        print(f'Database upgrade finished; the following backup file can be safely deleted: {minidb_filename}.\n')
15✔
1787
        print("The 'minidb' package can be removed (unless used by another program): $ pip uninstall minidb.")
15✔
1788
        print('-' * 80)
15✔
1789

1790
    def flushdb(self) -> None:
15✔
1791
        """Delete all entries of the database.  Use with care, there is no undo!"""
1792
        with self.lock:
15✔
1793
            self._execute('DELETE FROM webchanges')
15✔
1794
            self.db.commit()
15✔
1795

1796

1797
class SsdbRedisStorage(SsdbStorage):
15✔
1798
    """Class for storing snapshots using redis."""
1799

1800
    def __init__(self, filename: str | Path) -> None:
15✔
UNCOV
1801
        super().__init__(filename)
×
1802

UNCOV
1803
        if isinstance(redis, str):
×
1804
            raise ImportError(f"Python package 'redis' cannot be imported.\n{redis}")
×
1805

1806
        self.db = redis.from_url(str(filename))
×
1807
        logger.info(f'Using {self.filename} for database')
×
1808

1809
    @staticmethod
15✔
1810
    def _make_key(guid: str) -> str:
15✔
UNCOV
1811
        return 'guid:' + guid
×
1812

1813
    def close(self) -> None:
15✔
1814
        self.db.connection_pool.disconnect()
×
UNCOV
1815
        del self.db
×
1816

1817
    def get_guids(self) -> list[str]:
15✔
1818
        return [guid[5:].decode() for guid in self.db.keys('guid:*')]
×
1819

1820
    def load(self, guid: str) -> Snapshot:
15✔
1821
        key = self._make_key(guid)
×
UNCOV
1822
        data = self.db.lindex(key, 0)
×
1823

1824
        if data:
×
1825
            r = msgpack.unpackb(data)
×
UNCOV
1826
            return Snapshot(
×
1827
                r['data'], r['timestamp'], r['tries'], r['etag'], r.get('mime_type', ''), r.get('err_data', {})
1828
            )
1829

UNCOV
1830
        return Snapshot('', 0, 0, '', '', {})
×
1831

1832
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
15✔
1833
        if count is not None and count < 1:
×
UNCOV
1834
            return {}
×
1835

1836
        history = {}
×
1837
        key = self._make_key(guid)
×
UNCOV
1838
        for i in range(self.db.llen(key)):
×
1839
            r = self.db.lindex(key, i)
×
1840
            c = msgpack.unpackb(r)
×
1841
            if (c['tries'] == 0 or c['tries'] is None) and c['data'] not in history:
×
1842
                history[c['data']] = c['timestamp']
×
1843
                if count is not None and len(history) >= count:
×
1844
                    break
×
1845
        return history
×
1846

1847
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
15✔
1848
        if count is not None and count < 1:
×
UNCOV
1849
            return []
×
1850

1851
        history: list[Snapshot] = []
×
1852
        key = self._make_key(guid)
×
UNCOV
1853
        for i in range(self.db.llen(key)):
×
1854
            r = self.db.lindex(key, i)
×
1855
            c = msgpack.unpackb(r)
×
1856
            if c['tries'] == 0 or c['tries'] is None:
×
1857
                history.append(
×
1858
                    Snapshot(
1859
                        c['data'],
1860
                        c['timestamp'],
1861
                        c['tries'],
1862
                        c['etag'],
1863
                        c.get('mime_type', ''),
1864
                        c.get('error_data', {}),
1865
                    )
1866
                )
UNCOV
1867
                if count is not None and len(history) >= count:
×
UNCOV
1868
                    break
×
UNCOV
1869
        return history
×
1870

1871
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
15✔
1872
        r = {
×
1873
            'data': snapshot.data,
1874
            'timestamp': snapshot.timestamp,
1875
            'tries': snapshot.tries,
1876
            'etag': snapshot.etag,
1877
            'mime_type': snapshot.mime_type,
1878
            'error_data': snapshot.error_data,
1879
        }
UNCOV
1880
        packed_data = msgpack.packb(r)
×
UNCOV
1881
        if packed_data:
×
UNCOV
1882
            self.db.lpush(self._make_key(guid), packed_data)
×
1883

1884
    def delete(self, guid: str) -> None:
15✔
1885
        self.db.delete(self._make_key(guid))
×
1886

1887
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
15✔
1888
        """For the given 'guid', delete the latest 'delete_entries' entry and keep all other (older) ones.
1889

1890
        :param guid: The guid.
1891
        :param delete_entries: The number of most recent entries to delete (only 1 is supported by this Redis code).
1892

1893
        :returns: Number of records deleted.
1894
        """
UNCOV
1895
        if delete_entries != 1:
×
1896
            raise NotImplementedError('Only deleting of the latest 1 entry is supported by this Redis code.')
1897

1898
        if self.db.lpop(self._make_key(guid)) is None:
×
UNCOV
1899
            return 0
×
1900

1901
        return 1
×
1902

1903
    def delete_all(self) -> int:
15✔
1904
        """Delete all entries; used for testing only.
1905

1906
        :returns: Number of records deleted.
1907
        """
1908
        raise NotImplementedError('This method is not implemented for Redis.')
1909

1910
    def clean(self, guid: str, keep_entries: int = 1) -> int:
15✔
UNCOV
1911
        if keep_entries != 1:
×
1912
            raise NotImplementedError('Only keeping latest 1 entry is supported.')
1913

1914
        key = self._make_key(guid)
×
UNCOV
1915
        i = self.db.llen(key)
×
UNCOV
1916
        if self.db.ltrim(key, 0, 0):
×
1917
            return i - self.db.llen(key)
×
1918

1919
        return 0
×
1920

1921
    def move(self, guid: str, new_guid: str) -> int:
15✔
1922
        if guid == new_guid:
×
UNCOV
1923
            return 0
×
UNCOV
1924
        key = self._make_key(guid)
×
1925
        new_key = self._make_key(new_guid)
×
1926
        # Note if a list with 'new_key' already exists, the data stored there
1927
        # will be overwritten.
1928
        self.db.rename(key, new_key)
×
UNCOV
1929
        return self.db.llen(new_key)
×
1930

1931
    def rollback(self, timestamp: float) -> None:
15✔
1932
        """Rolls back the database to timestamp.
1933

1934
        :raises: NotImplementedError: This function is not implemented for 'redis' database engine.
1935
        """
1936
        raise NotImplementedError("Rolling back the database is not supported by 'redis' database engine")
1937

1938
    def flushdb(self) -> None:
15✔
1939
        """Delete all entries of the database.  Use with care, there is no undo!"""
UNCOV
1940
        self.db.flushdb()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc