• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 17752455918

16 Sep 2025 02:22AM UTC coverage: 71.558% (+0.2%) from 71.376%
17752455918

push

github

mborsetti
Version 3.31.2

1387 of 2314 branches covered (59.94%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

58 existing lines in 2 files now uncovered.

4626 of 6089 relevant lines covered (75.97%)

5.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.1
/webchanges/storage.py
1
"""Handles all storage: jobs files, config files, hooks file, and cache database engines."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import copy
8✔
8
import inspect
8✔
9
import io
8✔
10
import logging
8✔
11
import os
8✔
12
import shutil
8✔
13
import sqlite3
8✔
14
import sys
8✔
15
import threading
8✔
16
import warnings
8✔
17
from abc import ABC, abstractmethod
8✔
18
from collections import defaultdict
8✔
19
from dataclasses import dataclass
8✔
20
from datetime import datetime, timezone
8✔
21
from pathlib import Path
8✔
22
from types import NoneType
8✔
23
from typing import Any, Iterable, Iterator, Literal, TextIO, TypedDict
8✔
24

25
import msgpack
8✔
26
import yaml
8✔
27
import yaml.scanner
8✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
8✔
30
from webchanges.filters import FilterBase
8✔
31
from webchanges.handler import ErrorData, Snapshot
8✔
32
from webchanges.jobs import JobBase, ShellJob
8✔
33
from webchanges.reporters import ReporterBase
8✔
34
from webchanges.util import edit_file, file_ownership_checks
8✔
35

36
try:
8✔
37
    from httpx import Headers
8✔
38
except ImportError:  # pragma: no cover
39
    from webchanges._vendored.headers import Headers  # type: ignore[assignment]
40

41
try:
8✔
42
    import redis
8✔
43
except ImportError as e:  # pragma: no cover
44
    redis = str(e)  # type: ignore[assignment]
45

46
logger = logging.getLogger(__name__)
8✔
47

48
_ConfigDisplay = TypedDict(
8✔
49
    '_ConfigDisplay',
50
    {
51
        'new': bool,
52
        'error': bool,
53
        'unchanged': bool,
54
        'empty-diff': bool,
55
    },
56
)
57
_ConfigReportText = TypedDict(
8✔
58
    '_ConfigReportText',
59
    {
60
        'line_length': int,
61
        'details': bool,
62
        'footer': bool,
63
        'minimal': bool,
64
        'separate': bool,
65
    },
66
)
67
_ConfigReportHtml = TypedDict(
8✔
68
    '_ConfigReportHtml',
69
    {
70
        'diff': Literal['unified', 'table'],
71
        'footer': bool,
72
        'separate': bool,
73
        'title': str,
74
    },
75
)
76
_ConfigReportMarkdown = TypedDict(
8✔
77
    '_ConfigReportMarkdown',
78
    {
79
        'details': bool,
80
        'footer': bool,
81
        'minimal': bool,
82
        'separate': bool,
83
    },
84
)
85
_ConfigReportStdout = TypedDict(
8✔
86
    '_ConfigReportStdout',
87
    {
88
        'enabled': bool,
89
        'color': bool,
90
    },
91
)
92
_ConfigReportBrowser = TypedDict(
8✔
93
    '_ConfigReportBrowser',
94
    {
95
        'enabled': bool,
96
    },
97
)
98
_ConfigReportDiscord = TypedDict(
8✔
99
    '_ConfigReportDiscord',
100
    {
101
        'enabled': bool,
102
        'webhook_url': str,
103
        'embed': bool,
104
        'subject': str,
105
        'colored': bool,
106
        'max_message_length': int | None,
107
    },
108
)
109
_ConfigReportEmailSmtp = TypedDict(
8✔
110
    '_ConfigReportEmailSmtp',
111
    {
112
        'host': str,
113
        'user': str,
114
        'port': int,
115
        'starttls': bool,
116
        'auth': bool,
117
        'insecure_password': str,
118
        'utf-8': bool,
119
    },
120
)
121
_ConfigReportEmailSendmail = TypedDict(
8✔
122
    '_ConfigReportEmailSendmail',
123
    {
124
        'path': str | Path,
125
    },
126
)
127
_ConfigReportEmail = TypedDict(
8✔
128
    '_ConfigReportEmail',
129
    {
130
        'enabled': bool,
131
        'html': bool,
132
        'to': str,
133
        'from': str,
134
        'subject': str,
135
        'method': Literal['sendmail', 'smtp'],
136
        'smtp': _ConfigReportEmailSmtp,
137
        'sendmail': _ConfigReportEmailSendmail,
138
    },
139
)
140
_ConfigReportGithubIssue = TypedDict(
8✔
141
    '_ConfigReportGithubIssue',
142
    {
143
        'enabled': bool,
144
        'token': str,
145
        'owner': str,
146
        'repo': str,
147
        'title': str,
148
        'labels': list[str],
149
        'format_dt': str,
150
        'format_content': str,
151
        'assignees': list[str],
152
        'type': str,
153
        'milestone': str,
154
    },
155
)
156
_ConfigReportGotify = TypedDict(
8✔
157
    '_ConfigReportGotify',
158
    {
159
        'enabled': bool,
160
        'priority': int,
161
        'server_url': str,
162
        'title': str,
163
        'token': str,
164
    },
165
)
166
_ConfigReportIfttt = TypedDict(
8✔
167
    '_ConfigReportIfttt',
168
    {
169
        'enabled': bool,
170
        'key': str,
171
        'event': str,
172
    },
173
)
174
_ConfigReportMailgun = TypedDict(
8✔
175
    '_ConfigReportMailgun',
176
    {
177
        'enabled': bool,
178
        'region': str,
179
        'api_key': str,
180
        'domain': str,
181
        'from_mail': str,
182
        'from_name': str,
183
        'to': str,
184
        'subject': str,
185
    },
186
)
187
_ConfigReportMatrix = TypedDict(
8✔
188
    '_ConfigReportMatrix',
189
    {
190
        'enabled': bool,
191
        'homeserver': str,
192
        'access_token': str,
193
        'room_id': str,
194
    },
195
)
196
_ConfigReportProwl = TypedDict(
8✔
197
    '_ConfigReportProwl',
198
    {
199
        'enabled': bool,
200
        'api_key': str,
201
        'priority': int,
202
        'application': str,
203
        'subject': str,
204
    },
205
)
206
_ConfigReportPushbullet = TypedDict(
8✔
207
    '_ConfigReportPushbullet',
208
    {
209
        'enabled': bool,
210
        'api_key': str,
211
    },
212
)
213
_ConfigReportPushover = TypedDict(
8✔
214
    '_ConfigReportPushover',
215
    {
216
        'enabled': bool,
217
        'app': str,
218
        'device': str | None,
219
        'sound': str,
220
        'user': str,
221
        'priority': str,
222
    },
223
)
224
_ConfigReportRunCommand = TypedDict(
8✔
225
    '_ConfigReportRunCommand',
226
    {
227
        'enabled': bool,
228
        'command': str,
229
    },
230
)
231
_ConfigReportTelegram = TypedDict(
8✔
232
    '_ConfigReportTelegram',
233
    {
234
        'enabled': bool,
235
        'bot_token': str,
236
        'chat_id': str | int | list[str | int],
237
        'silent': bool,
238
    },
239
)
240
_ConfigReportWebhook = TypedDict(
8✔
241
    '_ConfigReportWebhook',
242
    {
243
        'enabled': bool,
244
        'markdown': bool,
245
        'webhook_url': str,
246
        'rich_text': bool | None,
247
        'max_message_length': int | None,
248
    },
249
)
250
_ConfigReportXmpp = TypedDict(
8✔
251
    '_ConfigReportXmpp',
252
    {
253
        'enabled': bool,
254
        'sender': str,
255
        'recipient': str,
256
        'insecure_password': str | None,
257
    },
258
)
259

260
_ConfigReport = TypedDict(
8✔
261
    '_ConfigReport',
262
    {
263
        'tz': str | None,
264
        'text': _ConfigReportText,
265
        'html': _ConfigReportHtml,
266
        'markdown': _ConfigReportMarkdown,
267
        'stdout': _ConfigReportStdout,
268
        'browser': _ConfigReportBrowser,
269
        'discord': _ConfigReportDiscord,
270
        'email': _ConfigReportEmail,
271
        'github_issue': _ConfigReportGithubIssue,
272
        'gotify': _ConfigReportGotify,
273
        'ifttt': _ConfigReportIfttt,
274
        'mailgun': _ConfigReportMailgun,
275
        'matrix': _ConfigReportMatrix,
276
        'prowl': _ConfigReportProwl,
277
        'pushbullet': _ConfigReportPushbullet,
278
        'pushover': _ConfigReportPushover,
279
        'run_command': _ConfigReportRunCommand,
280
        'telegram': _ConfigReportTelegram,
281
        'webhook': _ConfigReportWebhook,
282
        'xmpp': _ConfigReportXmpp,
283
    },
284
)
285
_ConfigJobDefaults = TypedDict(
8✔
286
    '_ConfigJobDefaults',
287
    {
288
        '_note': str,
289
        'all': dict[str, Any],
290
        'url': dict[str, Any],
291
        'browser': dict[str, Any],
292
        'command': dict[str, Any],
293
    },
294
    total=False,
295
)
296
_ConfigDifferDefaults = TypedDict(
8✔
297
    '_ConfigDifferDefaults',
298
    {
299
        '_note': str,
300
        'unified': dict[str, Any],
301
        'ai_google': dict[str, Any],
302
        'command': dict[str, Any],
303
        'deepdiff': dict[str, Any],
304
        'image': dict[str, Any],
305
        'table': dict[str, Any],
306
        'wdiff': dict[str, Any],
307
    },
308
    total=False,
309
)
310
_ConfigDatabase = TypedDict(
8✔
311
    '_ConfigDatabase',
312
    {
313
        'engine': Literal['sqlite3', 'redis', 'minidb', 'textfiles'] | str,
314
        'max_snapshots': int,
315
    },
316
)
317
_Config = TypedDict(
8✔
318
    '_Config',
319
    {
320
        'display': _ConfigDisplay,
321
        'report': _ConfigReport,
322
        'job_defaults': _ConfigJobDefaults,
323
        'differ_defaults': _ConfigDifferDefaults,
324
        'database': _ConfigDatabase,
325
        'footnote': str | None,
326
    },
327
)
328

329
DEFAULT_CONFIG: _Config = {
8✔
330
    'display': {  # select whether the report include the categories below
331
        'new': True,
332
        'error': True,
333
        'unchanged': False,
334
        'empty-diff': False,
335
    },
336
    'report': {
337
        'tz': None,  # the timezone as a IANA time zone name, e.g. 'America/Los_Angeles', or null for machine's
338
        # the directives below are for the report content types (text, html or markdown)
339
        'text': {
340
            'details': True,  # whether the diff is sent
341
            'footer': True,
342
            'line_length': 75,
343
            'minimal': False,
344
            'separate': False,
345
        },
346
        'html': {
347
            'diff': 'unified',  # 'unified' or 'table'
348
            'footer': True,
349
            'separate': False,
350
            'title': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
351
        },
352
        'markdown': {
353
            'details': True,  # whether the diff is sent
354
            'footer': True,
355
            'minimal': False,
356
            'separate': False,
357
        },
358
        # the directives below control 'reporters', i.e. where a report is displayed and/or sent
359
        'stdout': {  # the console / command line display; uses text
360
            'enabled': True,
361
            'color': True,
362
        },
363
        'browser': {  # the system's default browser; uses html
364
            'enabled': False,
365
        },
366
        'discord': {
367
            'enabled': False,
368
            'webhook_url': '',
369
            'embed': True,
370
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
371
            'colored': True,
372
            'max_message_length': None,
373
        },
374
        'email': {  # email (except mailgun); uses text or both html and text if 'html' is set to true
375
            'enabled': False,
376
            'html': True,
377
            'from': '',
378
            'to': '',
379
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
380
            'method': 'smtp',  # either 'smtp' or 'sendmail'
381
            'smtp': {
382
                'host': 'localhost',
383
                'port': 587,
384
                'starttls': True,
385
                'auth': True,
386
                'user': '',
387
                'insecure_password': '',
388
                'utf-8': True,
389
            },
390
            'sendmail': {
391
                'path': 'sendmail',
392
            },
393
        },
394
        'github_issue': {
395
            'enabled': False,
396
            'token': '',
397
            'owner': '',
398
            'repo': '',
399
            'title': '',
400
            'labels': [],
401
            'format_dt': '',
402
            'format_content': '',
403
            'assignees': [],
404
            'type': '',
405
            'milestone': '',
406
        },
407
        'gotify': {  # uses markdown
408
            'enabled': False,
409
            'priority': 0,
410
            'server_url': '',
411
            'title': '',
412
            'token': '',
413
        },
414
        'ifttt': {  # uses text
415
            'enabled': False,
416
            'key': '',
417
            'event': '',
418
        },
419
        'mailgun': {  # uses text
420
            'enabled': False,
421
            'region': 'us',
422
            'api_key': '',
423
            'domain': '',
424
            'from_mail': '',
425
            'from_name': '',
426
            'to': '',
427
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
428
        },
429
        'matrix': {  # uses text
430
            'enabled': False,
431
            'homeserver': '',
432
            'access_token': '',
433
            'room_id': '',
434
        },
435
        'prowl': {  # uses text
436
            'enabled': False,
437
            'api_key': '',
438
            'priority': 0,
439
            'application': '',
440
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
441
        },
442
        'pushbullet': {  # uses text
443
            'enabled': False,
444
            'api_key': '',
445
        },
446
        'pushover': {  # uses text
447
            'enabled': False,
448
            'app': '',
449
            'user': '',
450
            'device': None,
451
            'sound': 'spacealarm',
452
            'priority': 'normal',
453
        },
454
        'run_command': {  # uses text
455
            'enabled': False,
456
            'command': '',
457
        },
458
        'telegram': {  # uses markdown (from version 3.7)
459
            'enabled': False,
460
            'bot_token': '',
461
            'chat_id': '',
462
            'silent': False,
463
        },
464
        'webhook': {
465
            'enabled': False,
466
            'webhook_url': '',
467
            'markdown': False,
468
            'rich_text': None,
469
            'max_message_length': None,
470
        },
471
        'xmpp': {  # uses text
472
            'enabled': False,
473
            'sender': '',
474
            'recipient': '',
475
            'insecure_password': '',
476
        },
477
    },
478
    'job_defaults': {
479
        '_note': 'Default directives that are applied to jobs.',
480
        'all': {'_note': 'These are used for all type of jobs, including those in hooks.py.'},
481
        'url': {'_note': "These are used for 'url' jobs without 'use_browser'."},
482
        'browser': {'_note': "These are used for 'url' jobs with 'use_browser: true'."},
483
        'command': {'_note': "These are used for 'command' jobs."},
484
    },
485
    'differ_defaults': {
486
        '_note': 'Default directives that are applied to individual differs.',
487
        'unified': {},
488
        'ai_google': {},
489
        'command': {},
490
        'deepdiff': {},
491
        'image': {},
492
        'table': {},
493
        'wdiff': {},
494
    },
495
    'database': {
496
        'engine': 'sqlite3',
497
        'max_snapshots': 4,
498
    },
499
    'footnote': None,
500
}
501

502

503
# Custom YAML constructor for !include
504
def yaml_include(loader: yaml.SafeLoader, node: yaml.Node) -> list[Any]:
8✔
UNCOV
505
    file_path = Path(loader.name).parent.joinpath(node.value)
×
UNCOV
506
    with file_path.open('r') as f:
×
UNCOV
507
        return list(yaml.safe_load_all(f))
×
508

509

510
# Add the custom constructor to the YAML loader
511
yaml.add_constructor('!include', yaml_include, Loader=yaml.SafeLoader)
8✔
512

513

514
@dataclass
8✔
515
class BaseStorage(ABC):  # noqa:  B024 abstract base class, but it has no abstract methods or properties
8✔
516
    """Base class for storage."""
517

518

519
class BaseFileStorage(BaseStorage, ABC):
8✔
520
    """Base class for file storage."""
521

522
    def __init__(self, filename: str | Path) -> None:
8✔
523
        """
524

525
        :param filename: The filename or directory name to storage.
526
        """
527
        if isinstance(filename, str):
8✔
528
            self.filename = Path(filename)
8✔
529
        else:
530
            self.filename = filename
8✔
531

532

533
class BaseTextualFileStorage(BaseFileStorage, ABC):
8✔
534
    """Base class for textual files."""
535

536
    def __init__(self, filename: str | Path) -> None:
8✔
537
        """
538

539
        :param filename: The filename or directory name to storage.
540
        """
541
        super().__init__(filename)
8✔
542
        # if not isinstance(self, JobsBaseFileStorage):
543
        #     self.load()
544

545
    @abstractmethod
8✔
546
    def load(self, *args: Any) -> Any:
8✔
547
        """Load from storage.
548

549
        :param args: Specified by the subclass.
550
        :return: Specified by the subclass.
551
        """
552
        pass
8✔
553

554
    @abstractmethod
8✔
555
    def save(self, *args: Any, **kwargs: Any) -> Any:
8✔
556
        """Save to storage.
557

558
        :param args: Specified by the subclass.
559
        :param kwargs: Specified by the subclass.
560
        :return: Specified by the subclass.
561
        """
562
        pass
8✔
563

564
    @classmethod
8✔
565
    @abstractmethod
8✔
566
    def parse(cls, filename: Path) -> Any:
8✔
567
        """Parse storage contents.
568

569
        :param filename: The filename.
570
        :return: Specified by the subclass.
571
        """
572
        pass
8✔
573

574
    def edit(self) -> int:
8✔
575
        """Edit file.
576

577
        :returns: None if edit is successful, 1 otherwise.
578
        """
579
        # Similar code to UrlwatchCommand.edit_hooks()
580
        logger.debug(f'Edit file {self.filename}')
8✔
581
        if isinstance(self.filename, list):
8✔
582
            if len(self.filename) > 1:
8!
UNCOV
583
                raise ValueError(f'Only one jobs file can be specified for editing; found {len(self.filename)}.')
×
584
            filename = self.filename[0]
8✔
585
        else:
586
            filename = self.filename
8✔
587
        file_edit = filename.with_stem(filename.stem + '_edit')
8✔
588

589
        if filename.is_file():
8!
590
            shutil.copy(filename, file_edit)
8✔
591
        # elif example_file is not None and Path(example_file).is_file():
592
        #     shutil.copy(example_file, file_edit, follow_symlinks=False)
593

594
        while True:
8✔
595
            try:
8✔
596
                edit_file(file_edit)
8✔
597
                # Check if we can still parse it
598
                if self.parse is not None:
8!
599
                    self.parse(file_edit)
8✔
600
                break  # stop if no exception on parser
8✔
601
            except SystemExit:
8✔
UNCOV
602
                raise
×
603
            except Exception as e:
8✔
604
                print()
8✔
605
                print('Errors in updating file:')
8✔
606
                print('======')
8✔
607
                print(e)
8✔
608
                print('======')
8✔
609
                print('')
8✔
610
                print(f'The file {filename} was NOT updated.')
8✔
611
                user_input = input('Do you want to retry the same edit? [Y/n] ')
8✔
UNCOV
612
                if not user_input or user_input.lower().startswith('y'):
×
UNCOV
613
                    continue
×
UNCOV
614
                file_edit.unlink()
×
UNCOV
615
                print('No changes have been saved.')
×
616
                return 1
×
617

618
        if filename.is_symlink():
8!
619
            filename.write_text(file_edit.read_text())
×
620
        else:
621
            file_edit.replace(filename)
8✔
622
        file_edit.unlink(missing_ok=True)
8✔
623
        print('Saved edits in', filename)
8✔
624
        return 0
8✔
625

626

627
class JobsBaseFileStorage(BaseTextualFileStorage, ABC):
8✔
628
    """Class for jobs textual files storage."""
629

630
    filename: list[Path]  # type: ignore[assignment]
8✔
631

632
    def __init__(self, filename: list[Path]) -> None:
8✔
633
        """
634

635
        :param filename: The filenames of the jobs file.
636
        """
637
        super().__init__(filename)  # type: ignore[arg-type]
8✔
638
        self.filename = filename
8✔
639

640
    def load_secure(self) -> list[JobBase]:
8✔
641
        """Load the jobs from a text file checking that the file is secure (i.e. belongs to the current UID and only
642
        the owner can write to it - Linux only).
643

644
        :return: List of JobBase objects.
645
        """
646
        jobs: list[JobBase] = self.load()
8✔
647

648
        def is_shell_job(job: JobBase) -> bool:
8✔
649
            """Check if the job uses filter 'shellpipe' or an external differ, as they call
650
            subprocess.run(shell=True) (insecure).
651

652
            :returns: True if subprocess.run(shell=True) is invoked by job, False otherwise.
653
            """
654
            if isinstance(job, ShellJob):
8✔
655
                return True
8✔
656

657
            for filter_kind, _ in FilterBase.normalize_filter_list(job.filters, job.index_number):
8!
UNCOV
658
                if filter_kind == 'shellpipe':
×
UNCOV
659
                    return True
×
660

UNCOV
661
                if job.differ and job.differ.get('name') == 'command':
×
662
                    return True
×
663

664
            return False
8✔
665

666
        shelljob_errors = []
8✔
667
        for file in self.filename:
8✔
668
            shelljob_errors.extend(file_ownership_checks(file))
8✔
669
        removed_jobs = (job for job in jobs if is_shell_job(job))
8✔
670
        if shelljob_errors and any(removed_jobs):
8✔
671
            print(
8✔
672
                f'ERROR: Removing the following jobs because '
673
                f' {" and ".join(shelljob_errors)}: {" ,".join(str(job.index_number) for job in removed_jobs)}\n'
674
                f'(see {__docs_url__}en/stable/jobs.html#important-note-for-command-jobs).'
675
            )
676
            jobs = [job for job in jobs if job not in removed_jobs]
8✔
677

678
        logger.info(f'Loaded {len(jobs)} jobs from {", ".join(str(file) for file in self.filename)}.')
8✔
679
        return jobs
8✔
680

681

682
class BaseYamlFileStorage(BaseTextualFileStorage, ABC):
8✔
683
    """Base class for YAML textual files storage."""
684

685
    @classmethod
8✔
686
    def parse(cls, filename: Path) -> Any:
8✔
687
        """Return contents of YAML file if it exists
688

689
        :param filename: The filename Path.
690
        :return: Specified by the subclass.
691
        """
692
        if filename is not None and filename.is_file():
8✔
693
            with filename.open() as fp:
8✔
694
                return yaml.safe_load(fp)
8✔
695

696

697
class YamlConfigStorage(BaseYamlFileStorage):
8✔
698
    """Class for configuration file (is a YAML textual file)."""
699

700
    config: _Config = {}  # type: ignore[typeddict-item]
8✔
701

702
    @staticmethod
8✔
703
    def dict_deep_difference(d1: _Config, d2: _Config, ignore_underline_keys: bool = False) -> _Config:
8✔
704
        """Recursively find elements in the first dict that are not in the second.
705

706
        :param d1: The first dict.
707
        :param d2: The second dict.
708
        :param ignore_underline_keys: If true, keys starting with _ are ignored (treated as remarks)
709
        :return: A dict with all the elements on the first dict that are not in the second.
710
        """
711

712
        def _sub_dict_deep_difference(d1_: _Config, d2_: _Config) -> _Config:
8✔
713
            """Recursive sub-function to find elements in the first dict that are not in the second.
714

715
            :param d1_: The first dict.
716
            :param d2_: The second dict.
717
            :return: A dict with elements on the first dict that are not in the second.
718
            """
719
            for key, value in d1_.copy().items():
8✔
720
                if ignore_underline_keys and key.startswith('_'):
8✔
721
                    d1_.pop(key, None)  # type: ignore[misc]
8✔
722
                elif isinstance(value, dict) and isinstance(d2_.get(key), dict):
8✔
723
                    _sub_dict_deep_difference(value, d2_[key])  # type: ignore[arg-type,literal-required]
8✔
724
                    if not len(value):
8✔
725
                        d1_.pop(key)  # type: ignore[misc]
8✔
726
                else:
727
                    if key in d2_:
8✔
728
                        d1_.pop(key)  # type: ignore[misc]
8✔
729
            return d1_
8✔
730

731
        return _sub_dict_deep_difference(copy.deepcopy(d1), d2)
8✔
732

733
    @staticmethod
8✔
734
    def dict_deep_merge(source: _Config, destination: _Config) -> _Config:
8✔
735
        """Recursively deep merges source dict into destination dict.
736

737
        :param source: The first dict.
738
        :param destination: The second dict.
739
        :return: The deep merged dict.
740
        """
741

742
        # https://stackoverflow.com/a/20666342
743

744
        def _sub_dict_deep_merge(source_: _Config, destination_: _Config) -> _Config:
8✔
745
            """Recursive sub-function to merges source_ dict into destination_ dict.
746

747
            :param source_: The first dict.
748
            :param destination_: The second dict.
749
            :return: The merged dict.
750
            """
751
            for key, value in source_.items():
8✔
752
                if isinstance(value, dict):
8✔
753
                    # get node or create one
754
                    node = destination_.setdefault(key, {})  # type: ignore[misc]
8✔
755
                    _sub_dict_deep_merge(value, node)  # type: ignore[arg-type]
8✔
756
                else:
757
                    destination_[key] = value  # type: ignore[literal-required]
8✔
758

759
            return destination_
8✔
760

761
        return _sub_dict_deep_merge(source, copy.deepcopy(destination))
8✔
762

763
    def check_for_unrecognized_keys(self, config: _Config) -> None:
8✔
764
        """Test if config has keys not in DEFAULT_CONFIG (bad keys, e.g. typos); if so, raise ValueError.
765

766
        :param config: The configuration.
767
        :raises ValueError: If the configuration has keys not in DEFAULT_CONFIG (bad keys, e.g. typos)
768
        """
769
        config_for_extras = copy.deepcopy(config)
8✔
770
        if 'job_defaults' in config_for_extras:
8!
771
            # Create missing 'job_defaults' keys from DEFAULT_CONFIG
772
            for key in DEFAULT_CONFIG['job_defaults']:
8✔
773
                if 'job_defaults' not in config_for_extras:
8!
UNCOV
774
                    config_for_extras['job_defaults'] = {}
×
775
                config_for_extras['job_defaults'][key] = None  # type: ignore[literal-required]
8✔
776
            for key in DEFAULT_CONFIG['differ_defaults']:
8✔
777
                if 'differ_defaults' not in config_for_extras:
8!
778
                    config_for_extras['differ_defaults'] = {}
×
779
                config_for_extras['differ_defaults'][key] = None  # type: ignore[literal-required]
8✔
780
        if 'hooks' in sys.modules:
8✔
781
            # Remove extra keys in config used in hooks (they are not in DEFAULT_CONFIG)
782
            for _, obj in inspect.getmembers(
8✔
783
                sys.modules['hooks'], lambda x: inspect.isclass(x) and x.__module__ == 'hooks'
784
            ):
785
                if issubclass(obj, JobBase):
8✔
786
                    if obj.__kind__ not in DEFAULT_CONFIG['job_defaults'].keys():
8!
787
                        config_for_extras['job_defaults'].pop(obj.__kind__, None)  # type: ignore[misc]
8✔
UNCOV
788
                    elif obj.__kind__ not in DEFAULT_CONFIG['job_defaults'].keys():
×
UNCOV
789
                        config_for_extras['job_defaults'].pop(obj.__kind__, None)  # type: ignore[misc]
×
790
                elif issubclass(obj, ReporterBase):
8✔
791
                    if obj.__kind__ not in DEFAULT_CONFIG['report'].keys():
8!
792
                        config_for_extras['report'].pop(obj.__kind__, None)  # type: ignore[misc]
8✔
793
        if 'slack' in config_for_extras.get('report', {}):
8✔
794
            # Ignore legacy key
795
            config_for_extras['report'].pop('slack')  # type: ignore[typeddict-item]
8✔
796
        extras: _Config = self.dict_deep_difference(config_for_extras, DEFAULT_CONFIG, ignore_underline_keys=True)
8✔
797
        if not extras.get('report'):
8✔
798
            extras.pop('report', None)  # type: ignore[misc]
8✔
799
        if extras:
8✔
800
            warnings.warn(
8✔
801
                f'Found unrecognized directive(s) in the configuration file {self.filename}:\n'
802
                f'{yaml.safe_dump(extras)}Check for typos or the hooks.py file (if any); documentation is at '
803
                f'{__docs_url__}\n',
804
                RuntimeWarning,
805
                stacklevel=1,
806
            )
807

808
    @staticmethod
8✔
809
    def replace_none_keys(config: _Config) -> None:
8✔
810
        """Fixes None keys in loaded config that should be empty dicts instead."""
811
        if 'job_defaults' not in config:
8!
UNCOV
812
            config['job_defaults'] = DEFAULT_CONFIG['job_defaults']
×
813
        else:
814
            if 'shell' in config['job_defaults']:
8!
UNCOV
815
                if 'command' in config['job_defaults']:
×
816
                    raise KeyError(
×
817
                        "Found both 'shell' and 'command' job_defaults in config, a duplicate. Please remove 'shell' "
818
                        'ones.'
819
                    )
820
                else:
UNCOV
821
                    config['job_defaults']['command'] = config['job_defaults'].pop(
×
822
                        'shell'  # type: ignore[typeddict-item]
823
                    )
824
            for key in {'all', 'url', 'browser', 'command'}:
8✔
825
                if key not in config['job_defaults']:
8!
UNCOV
826
                    config['job_defaults'][key] = {}  # type: ignore[literal-required]
×
827
                elif config['job_defaults'][key] is None:  # type: ignore[literal-required]
8!
UNCOV
828
                    config['job_defaults'][key] = {}  # type: ignore[literal-required]
×
829

830
    def load(self, *args: Any) -> None:
8✔
831
        """Load configuration file from self.filename into self.config adding missing keys from DEFAULT_CONFIG.
832

833
        :param args: None used.
834
        """
835
        logger.debug(f'Loading configuration from {self.filename}')
8✔
836
        config: _Config = self.parse(self.filename)
8✔
837

838
        if config:
8✔
839
            self.replace_none_keys(config)
8✔
840
            self.check_for_unrecognized_keys(config)
8✔
841

842
            # If config is missing keys in DEFAULT_CONFIG, log the missing keys and deep merge DEFAULT_CONFIG
843
            missing = self.dict_deep_difference(DEFAULT_CONFIG, config, ignore_underline_keys=True)
8✔
844
            if missing:
8!
845
                logger.info(
8✔
846
                    f'The configuration file {self.filename} is missing directive(s); using default value for those. '
847
                    'Run with -vv for more detalis.'
848
                )
849
                logger.debug(
8✔
850
                    f'The following default values are being used:\n'
851
                    f'{yaml.safe_dump(missing)}'
852
                    f'See documentation at {__docs_url__}en/stable/configuration.html'
853
                )
854
                config = self.dict_deep_merge(config or {}, DEFAULT_CONFIG)
8✔
855

856
            # format headers
857
            for job_defaults_type in {'all', 'url', 'browser'}:
8✔
858
                if 'headers' in config['job_defaults'][job_defaults_type]:  # type: ignore[literal-required]
8!
UNCOV
859
                    config['job_defaults'][job_defaults_type]['headers'] = Headers(  # type: ignore[literal-required]
×
860
                        {
861
                            k: str(v)
862
                            for k, v in config['job_defaults'][job_defaults_type][  # type: ignore[literal-required]
863
                                'headers'
864
                            ].items()
865
                        },
866
                        encoding='utf-8',
867
                    )
868
                if 'cookies' in config['job_defaults'][job_defaults_type]:  # type: ignore[literal-required]
8!
UNCOV
869
                    config['job_defaults'][job_defaults_type]['cookies'] = {  # type: ignore[literal-required]
×
870
                        k: str(v)
871
                        for k, v in config['job_defaults'][job_defaults_type][  # type: ignore[literal-required]
872
                            'cookies'
873
                        ].items()
874
                    }
875
            logger.info(f'Loaded configuration from {self.filename}')
8✔
876

877
        else:
878
            logger.warning(f'No directives found in the configuration file {self.filename}; using default directives.')
8✔
879
            config = DEFAULT_CONFIG
8✔
880

881
        self.config = config
8✔
882

883
    def save(self, *args: Any, **kwargs: Any) -> None:
8✔
884
        """Save self.config into self.filename using YAML.
885

886
        :param args: None used.
887
        :param kwargs: None used.
888
        """
889
        with self.filename.open('w') as fp:
8✔
890
            fp.write(
8✔
891
                f'# {__project_name__} configuration file. See {__docs_url__}en/stable/configuration.html\n'
892
                f'# Originally written on {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}Z by version'
893
                f' {__version__}.\n'
894
                f'\n'
895
            )
896
            yaml.safe_dump(self.config, fp, allow_unicode=True, sort_keys=False)
8✔
897

898
    @classmethod
8✔
899
    def write_default_config(cls, filename: Path) -> None:
8✔
900
        """Write default configuration to file.
901

902
        :param filename: The filename.
903
        """
904
        config_storage = cls(filename)
8✔
905
        config_storage.config = DEFAULT_CONFIG
8✔
906
        config_storage.save()
8✔
907

908

909
class YamlJobsStorage(BaseYamlFileStorage, JobsBaseFileStorage):
8✔
910
    """Class for jobs file (is a YAML textual file)."""
911

912
    @classmethod
8✔
913
    def _parse(cls, fp: TextIO, filenames: list[Path]) -> list[JobBase]:
8✔
914
        """Parse the contents of a jobs YAML file.
915

916
        :param fp: The text stream to parse.
917
        :return: A list of JobBase objects.
918
        :raise yaml.YAMLError: If a YAML error is found in the file.
919
        :raise ValueError: If a duplicate URL/command is found in the list.
920
        """
921

922
        def job_files_for_error() -> list[str]:
8✔
923
            """
924
            :return: A list of line containing the names of the job files.
925
            """
926
            if len(filenames) > 1:
8!
UNCOV
927
                jobs_files = ['in the concatenation of the jobs files:'] + [f'• {file},' for file in filenames]
×
928
            elif len(filenames) == 1:
8!
929
                jobs_files = [f'in jobs file {filenames[0]}.']
8✔
930
            else:
931
                jobs_files = []
×
932
            return jobs_files
8✔
933

934
        jobs = []
8✔
935
        jobs_by_guid = defaultdict(list)
8✔
936
        try:
8✔
937
            for i, job_data in enumerate((job for job in yaml.safe_load_all(fp) if job)):
8✔
938
                if not isinstance(job_data, dict):
8!
UNCOV
939
                    raise ValueError(
×
940
                        '\n   '.join(
941
                            [
942
                                f'Found invalid job data (consisting of the {type(job_data).__name__} {job_data})',
943
                                *job_files_for_error(),
944
                            ]
945
                        )
946
                    )
947
                job_data['index_number'] = i + 1
8✔
948
                job = JobBase.unserialize(job_data, filenames)
8✔
949
                # TODO Implement 100% validation and remove it from jobs.py
950
                # TODO Try using pydantic to do this.
951
                if not isinstance(job.data, (NoneType, str, dict, list)):
8!
UNCOV
952
                    raise ValueError(
×
953
                        '\n   '.join(
954
                            [
955
                                f"The 'data' key needs to contain a string, a dictionary or a list; found a"
956
                                f' {type(job.data).__name__} ',
957
                                f'in {job.get_indexed_location()}',
958
                                *job_files_for_error(),
959
                            ]
960
                        )
961
                    )
962
                if not isinstance(job.filters, (NoneType, list)):
8!
UNCOV
963
                    raise ValueError(
×
964
                        '\n   '.join(
965
                            [
966
                                f"The 'filter' key needs to contain a list; found a {type(job.filters).__name__} ",
967
                                f'in {job.get_indexed_location()}',
968
                                *job_files_for_error(),
969
                            ]
970
                        )
971
                    )
972
                if not isinstance(job.headers, (NoneType, dict, Headers)):
8!
UNCOV
973
                    raise ValueError(
×
974
                        '\n   '.join(
975
                            [
976
                                f"The 'headers' key needs to contain a dictionary; found a "
977
                                f'{type(job.headers).__name__} ',
978
                                f'in {job.get_indexed_location()})',
979
                                *job_files_for_error(),
980
                            ]
981
                        )
982
                    )
983
                if not isinstance(job.cookies, (NoneType, dict)):
8!
UNCOV
984
                    raise ValueError(
×
985
                        '\n   '.join(
986
                            [
987
                                f"The 'cookies' key needs to contain a dictionary; found a "
988
                                f'{type(job.headers).__name__} ',
989
                                f'in {job.get_indexed_location()})',
990
                                *job_files_for_error(),
991
                            ]
992
                        )
993
                    )
994
                if not isinstance(job.switches, (NoneType, str, list)):
8!
UNCOV
995
                    raise ValueError(
×
996
                        '\n   '.join(
997
                            [
998
                                f"The 'switches' key needs to contain a string or a list; found a "
999
                                f'{type(job.switches).__name__} ',
1000
                                f'in {job.get_indexed_location()}',
1001
                                *job_files_for_error(),
1002
                            ]
1003
                        )
1004
                    )
1005
                # We add GUID here to speed things up and to allow hooks to programmatically change job.url and/or
1006
                # job.user_visible_url
1007
                job.guid = job.get_guid()
8✔
1008
                jobs.append(job)
8✔
1009
                jobs_by_guid[job.guid].append(job)
8✔
UNCOV
1010
        except yaml.scanner.ScannerError as e:
×
UNCOV
1011
            raise ValueError(
×
1012
                '\n   '.join(
1013
                    [
1014
                        f'YAML parser {e.args[2].replace("here", "")} in line {e.args[3].line + 1}, column'
1015
                        f' {e.args[3].column + 1}',
1016
                        *job_files_for_error(),
1017
                    ]
1018
                )
1019
            ) from None
1020

1021
        conflicting_jobs = []
8✔
1022
        for _, guid_jobs in jobs_by_guid.items():
8✔
1023
            if len(guid_jobs) != 1:
8✔
1024
                conflicting_jobs.append(guid_jobs[0].get_location())
8✔
1025

1026
        if conflicting_jobs:
8✔
1027
            raise ValueError(
8✔
1028
                '\n   '.join(
1029
                    ['Each job must have a unique URL/command (for URLs, append #1, #2, etc. to make them unique):']
1030
                    + [f'• {job}' for job in conflicting_jobs]
1031
                    + ['']
1032
                    + job_files_for_error()
1033
                )
1034
            ) from None
1035

1036
        return jobs
8✔
1037

1038
    @classmethod
8✔
1039
    def parse(cls, filename: Path) -> list[JobBase]:
8✔
1040
        """Parse the contents of a jobs YAML file and return a list of jobs.
1041

1042
        :param filename: The filename Path.
1043
        :return: A list of JobBase objects.
1044
        """
1045
        if filename is not None and filename.is_file():
8!
1046
            with filename.open() as fp:
8✔
1047
                return cls._parse(fp, [filename])
8✔
UNCOV
1048
        return []
×
1049

1050
    def load(self, *args: Any) -> list[JobBase]:
8✔
1051
        """Parse the contents of the jobs YAML file(s) and return a list of jobs.
1052

1053
        :return: A list of JobBase objects.
1054
        """
1055
        if len(self.filename) == 1:
8!
1056
            with self.filename[0].open() as f:
8✔
1057
                return self._parse(f, self.filename)
8✔
1058
        else:
UNCOV
1059
            fp = io.StringIO('\n---\n'.join(f.read_text(encoding='utf-8-sig') for f in self.filename if f.is_file()))
×
UNCOV
1060
            return self._parse(fp, self.filename)
×
1061

1062
    def save(self, jobs: Iterable[JobBase]) -> None:
8✔
1063
        """Save jobs to the job YAML file.
1064

1065
        :param jobs: An iterable of JobBase objects to be written.
1066
        """
1067
        print(f'Saving updated list to {self.filename[0]}.')
8✔
1068

1069
        with self.filename[0].open('w') as fp:
8✔
1070
            yaml.safe_dump_all([job.serialize() for job in jobs], fp, allow_unicode=True, sort_keys=False)
8✔
1071

1072

1073
class SsdbStorage(BaseFileStorage, ABC):
8✔
1074
    """Base class for snapshots storage."""
1075

1076
    @abstractmethod
8✔
1077
    def close(self) -> None:
8✔
1078
        pass
8✔
1079

1080
    @abstractmethod
8✔
1081
    def get_guids(self) -> list[str]:
8✔
1082
        pass
8✔
1083

1084
    @abstractmethod
8✔
1085
    def load(self, guid: str) -> Snapshot:
8✔
1086
        pass
8✔
1087

1088
    @abstractmethod
8✔
1089
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
8✔
1090
        pass
8✔
1091

1092
    @abstractmethod
8✔
1093
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
8✔
UNCOV
1094
        pass
×
1095

1096
    @abstractmethod
8✔
1097
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
8✔
1098
        pass
8✔
1099

1100
    @abstractmethod
8✔
1101
    def delete(self, guid: str) -> None:
8✔
1102
        pass
8✔
1103

1104
    @abstractmethod
8✔
1105
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
8✔
1106
        """For the given 'guid', delete only the latest 'delete_entries' entries and keep all other (older) ones.
1107

1108
        :param guid: The guid.
1109
        :param delete_entries: The number of most recent entries to delete.
1110

1111
        :returns: Number of records deleted.
1112
        """
1113
        pass
8✔
1114

1115
    @abstractmethod
8✔
1116
    def delete_all(self) -> int:
8✔
1117
        """Delete all entries; used for testing only.
1118

1119
        :returns: Number of records deleted.
1120
        """
UNCOV
1121
        pass
×
1122

1123
    @abstractmethod
8✔
1124
    def clean(self, guid: str, keep_entries: int = 1) -> int:
8✔
1125
        pass
8✔
1126

1127
    @abstractmethod
8✔
1128
    def move(self, guid: str, new_guid: str) -> int:
8✔
UNCOV
1129
        pass
×
1130

1131
    @abstractmethod
8✔
1132
    def rollback(self, timestamp: float) -> int | None:
8✔
1133
        pass
8✔
1134

1135
    def backup(self) -> Iterator[tuple[str, str | bytes, float, int, str, str, ErrorData]]:
8✔
1136
        """Return the most recent entry for each 'guid'.
1137

1138
        :returns: A generator of tuples, each consisting of (guid, data, timestamp, tries, etag, mime_type)
1139
        """
1140
        for guid in self.get_guids():
8✔
1141
            data, timestamp, tries, etag, mime_type, error_data = self.load(guid)
8✔
1142
            yield guid, data, timestamp, tries, etag, mime_type, error_data
8✔
1143

1144
    def restore(self, entries: Iterable[tuple[str, str | bytes, float, int, str, str, ErrorData]]) -> None:
8✔
1145
        """Save multiple entries into the database.
1146

1147
        :param entries: An iterator of tuples WHERE each consists of (guid, data, timestamp, tries, etag, mime_type)
1148
        """
1149
        for guid, data, timestamp, tries, etag, mime_type, error_data in entries:
8✔
1150
            new_snapshot = Snapshot(
8✔
1151
                data=data, timestamp=timestamp, tries=tries, etag=etag, mime_type=mime_type, error_data=error_data
1152
            )
1153
            self.save(guid=guid, snapshot=new_snapshot, temporary=False)
8✔
1154

1155
    def gc(self, known_guids: Iterable[str], keep_entries: int = 1) -> None:
8✔
1156
        """Garbage collect the database: delete all guids not included in known_guids and keep only last n snapshot for
1157
        the others.
1158

1159
        :param known_guids: The guids to keep.
1160
        :param keep_entries: Number of entries to keep after deletion for the guids to keep.
1161
        """
1162
        for guid in set(self.get_guids()) - set(known_guids):
8✔
1163
            print(f'Deleting job {guid} (no longer being tracked).')
8✔
1164
            self.delete(guid)
8✔
1165
        self.clean_ssdb(known_guids, keep_entries)
8✔
1166

1167
    def clean_ssdb(self, known_guids: Iterable[str], keep_entries: int = 1) -> None:
8✔
1168
        """Convenience function to clean the cache.
1169

1170
        If self.clean_all is present, runs clean_all(). Otherwise, runs clean() on all known_guids, one at a time.
1171
        Prints the number of snapshots deleted.
1172

1173
        :param known_guids: An iterable of guids
1174
        :param keep_entries: Number of entries to keep after deletion.
1175
        """
1176
        if hasattr(self, 'clean_all'):
8✔
1177
            count = self.clean_all(keep_entries)
8✔
1178
            if count:
8✔
1179
                print(f'Deleted {count} old snapshots.')
8✔
1180
        else:
1181
            for guid in known_guids:
8✔
1182
                count = self.clean(guid, keep_entries)
8✔
1183
                if count:
8✔
1184
                    print(f'Deleted {count} old snapshots of {guid}.')
8✔
1185

1186
    @abstractmethod
8✔
1187
    def flushdb(self) -> None:
8✔
1188
        """Delete all entries of the database.  Use with care, there is no undo!"""
UNCOV
1189
        pass
×
1190

1191

1192
class SsdbDirStorage(SsdbStorage):
8✔
1193
    """Class for snapshots stored as individual textual files in a directory 'dirname'."""
1194

1195
    def __init__(self, dirname: str | Path) -> None:
8✔
1196
        super().__init__(dirname)
8✔
1197
        self.filename.mkdir(parents=True, exist_ok=True)  # using the attr filename because it is a Path (confusing!)
8✔
1198
        logger.info(f'Using directory {self.filename} to store snapshot data as individual text files')
8✔
1199

1200
    def close(self) -> None:
8✔
1201
        # Nothing to close
UNCOV
1202
        return
×
1203

1204
    def _get_filename(self, guid: str) -> Path:
8✔
1205
        return self.filename.joinpath(guid)  # filename is a dir (confusing!)
8✔
1206

1207
    def get_guids(self) -> list[str]:
8✔
1208
        return [filename.name for filename in self.filename.iterdir()]
8✔
1209

1210
    def load(self, guid: str) -> Snapshot:
8✔
1211
        filename = self._get_filename(guid)
8✔
1212
        if not filename.is_file():
8✔
1213
            return Snapshot('', 0, 0, '', '', {})
8✔
1214

1215
        try:
8✔
1216
            data = filename.read_text()
8✔
UNCOV
1217
        except UnicodeDecodeError:
×
UNCOV
1218
            data = filename.read_text(errors='ignore')
×
UNCOV
1219
            logger.warning(f'Found and ignored Unicode-related errors when retrieving saved snapshot {guid}')
×
1220

1221
        timestamp = filename.stat().st_mtime
8✔
1222

1223
        return Snapshot(data, timestamp, 0, '', '', {})
8✔
1224

1225
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
8✔
1226
        if count is not None and count < 1:
8✔
1227
            return {}
8✔
1228
        else:
1229
            snapshot = self.load(guid)
8✔
1230
            return {snapshot.data: snapshot.timestamp} if snapshot.data and snapshot.timestamp else {}
8✔
1231

1232
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
8✔
1233
        if count is not None and count < 1:
8✔
1234
            return []
8✔
1235
        else:
1236
            snapshot = self.load(guid)
8✔
1237
            return [snapshot] if snapshot.data and snapshot.timestamp else []
8✔
1238

1239
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
8✔
1240
        # ETag and mime_type are ignored
1241
        filename = self._get_filename(guid)
8✔
1242
        with filename.open('w+') as fp:
8✔
1243
            fp.write(str(snapshot.data))
8✔
1244
        os.utime(filename, times=(datetime.now().timestamp(), snapshot.timestamp))  # noqa: DTZ005
8✔
1245

1246
    def delete(self, guid: str) -> None:
8✔
1247
        filename = self._get_filename(guid)
8✔
1248
        filename.unlink(missing_ok=True)
8✔
1249
        return
8✔
1250

1251
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
8✔
1252
        """For the given 'guid', delete the latest entry and keep all other (older) ones.
1253

1254
        :param guid: The guid.
1255
        :param delete_entries: The number of most recent entries to delete.
1256

1257
        :raises NotImplementedError: This function is not implemented for 'textfiles' databases.
1258
        """
1259
        raise NotImplementedError(
1260
            "Deleting of latest snapshot not supported by 'textfiles' database engine since only one snapshot is "
1261
            "saved. Delete all snapshots if that's what you are trying to do."
1262
        )
1263

1264
    def delete_all(self) -> int:
8✔
1265
        """Delete all entries; used for testing only.
1266

1267
        :raises NotImplementedError: This function is not implemented for 'textfiles' databases.
1268
        """
1269
        raise NotImplementedError(
1270
            "Deleting of latest snapshot not supported by 'textfiles' database engine since only one snapshot is "
1271
            "saved. Delete all snapshots if that's what you are trying to do."
1272
        )
1273

1274
    def clean(self, guid: str, keep_entries: int = 1) -> int:
8✔
1275
        if keep_entries != 1:
8✔
1276
            raise NotImplementedError('Only keeping latest 1 entry is supported.')
1277
        # We only store the latest version, no need to clean
1278
        return 0
8✔
1279

1280
    def move(self, guid: str, new_guid: str) -> int:
8✔
1281
        if guid == new_guid:
8!
UNCOV
1282
            return 0
×
1283
        os.rename(self._get_filename(guid), self._get_filename(new_guid))
8✔
1284
        return 1
8✔
1285

1286
    def rollback(self, timestamp: float) -> None:
8✔
1287
        raise NotImplementedError("'textfiles' databases cannot be rolled back as new snapshots overwrite old ones")
1288

1289
    def flushdb(self) -> None:
8✔
1290
        for file in self.filename.iterdir():
8✔
1291
            if file.is_file():
8!
1292
                file.unlink()
8✔
1293

1294

1295
class SsdbSQLite3Storage(SsdbStorage):
8✔
1296
    """
1297
    Handles storage of the snapshot as a SQLite database in the 'filename' file using Python's built-in sqlite3 module
1298
    and the msgpack package.
1299

1300
    A temporary database is created by __init__ and will be written by the 'save()' function (unless temporary=False).
1301
    This data will be written to the permanent one by the 'close()' function, which is called at the end of program
1302
    execution.
1303

1304
    The database contains the 'webchanges' table with the following columns:
1305

1306
    * guid: unique hash of the "location", i.e. the URL/command; indexed
1307
    * timestamp: the Unix timestamp of when then the snapshot was taken; indexed
1308
    * msgpack_data: a msgpack blob containing 'data', 'tries', 'etag' and 'mime_type' in a dict of keys 'd', 't',
1309
      'e' and 'm'
1310
    """
1311

1312
    def __init__(self, filename: Path, max_snapshots: int = 4) -> None:
8✔
1313
        """
1314
        :param filename: The full filename of the database file
1315
        :param max_snapshots: The maximum number of snapshots to retain in the database for each 'guid'
1316
        """
1317
        # Opens the database file and, if new, creates a table and index.
1318

1319
        self.max_snapshots = max_snapshots
8✔
1320

1321
        logger.debug(f'Run-time SQLite library: {sqlite3.sqlite_version}')
8✔
1322
        super().__init__(filename)
8✔
1323

1324
        self.filename.parent.mkdir(parents=True, exist_ok=True)
8✔
1325

1326
        # https://stackoverflow.com/questions/26629080
1327
        self.lock = threading.RLock()
8✔
1328

1329
        self.db = sqlite3.connect(filename, check_same_thread=False)
8✔
1330
        logger.info(f'Using sqlite3 {sqlite3.sqlite_version} database at {filename}')
8✔
1331
        self.cur = self.db.cursor()
8✔
1332
        self.cur.execute('PRAGMA temp_store = MEMORY;')
8✔
1333
        tables = self._execute("SELECT name FROM sqlite_master WHERE type='table';").fetchone()
8✔
1334

1335
        def _initialize_table() -> None:
8✔
1336
            logger.debug('Initializing sqlite3 database')
8✔
1337
            self._execute('CREATE TABLE webchanges (uuid TEXT, timestamp REAL, msgpack_data BLOB)')
8✔
1338
            self._execute('CREATE INDEX idx_uuid_time ON webchanges(uuid, timestamp)')
8✔
1339
            self.db.commit()
8✔
1340

1341
        if tables == ('CacheEntry',):
8✔
1342
            logger.info("Found legacy 'minidb' database to convert")
8✔
1343

1344
            # Found a minidb legacy database; close it, rename it for migration and create new sqlite3 one
1345
            import importlib.util
8✔
1346

1347
            if importlib.util.find_spec('minidb') is None:
8!
UNCOV
1348
                print('You have an old snapshot database format that needs to be converted to a current one.')
×
UNCOV
1349
                print(
×
1350
                    f"Please install the Python package 'minidb' for this one-time conversion and rerun "
1351
                    f'{__project_name__}.'
1352
                )
1353
                print('Use e.g. `pip install -U minidb`.')
×
UNCOV
1354
                print()
×
UNCOV
1355
                print("After the conversion, you can uninstall 'minidb' with e.g. `pip uninstall minidb`.")
×
UNCOV
1356
                sys.exit(1)
×
1357

1358
            print('Performing one-time conversion from old snapshot database format.')
8✔
1359
            self.db.close()
8✔
1360
            minidb_filename = filename.with_stem(filename.stem + '_minidb')
8✔
1361
            self.filename.replace(minidb_filename)
8✔
1362
            self.db = sqlite3.connect(filename, check_same_thread=False)
8✔
1363
            self.cur = self.db.cursor()
8✔
1364
            _initialize_table()
8✔
1365
            # Migrate the minidb legacy database renamed above
1366
            self.migrate_from_minidb(minidb_filename)
8✔
1367
        elif tables != ('webchanges',):
8!
1368
            _initialize_table()
8✔
1369

1370
        # Create temporary database in memory for writing during execution (fault tolerance)
1371
        logger.debug('Creating temp sqlite3 database file in memory')
8✔
1372
        self.temp_lock = threading.RLock()
8✔
1373
        self.temp_db = sqlite3.connect('', check_same_thread=False)
8✔
1374
        self.temp_cur = self.temp_db.cursor()
8✔
1375
        self._temp_execute('CREATE TABLE webchanges (uuid TEXT, timestamp REAL, msgpack_data BLOB)')
8✔
1376
        self.temp_db.commit()
8✔
1377

1378
    def _execute(self, sql: str, args: tuple | None = None) -> sqlite3.Cursor:
8✔
1379
        """Execute SQL command on main database"""
1380
        if args is None:
8✔
1381
            logger.debug(f"Executing (perm) '{sql}'")
8✔
1382
            return self.cur.execute(sql)
8✔
1383
        else:
1384
            logger.debug(f"Executing (perm) '{sql}' with {args}")
8✔
1385
            return self.cur.execute(sql, args)
8✔
1386

1387
    def _temp_execute(self, sql: str, args: tuple | None = None) -> sqlite3.Cursor:
8✔
1388
        """Execute SQL command on temp database."""
1389
        if args is None:
8✔
1390
            logger.debug(f"Executing (temp) '{sql}'")
8✔
1391
            return self.temp_cur.execute(sql)
8✔
1392
        else:
1393
            logger.debug(f"Executing (temp) '{sql}' with {args[:2]}...")
8✔
1394
            return self.temp_cur.execute(sql, args)
8✔
1395

1396
    def _copy_temp_to_permanent(self, delete: bool = False) -> None:
8✔
1397
        """Copy contents of temporary database to permanent one.
1398

1399
        :param delete: also delete contents of temporary cache (used for testing)
1400
        """
1401
        logger.debug('Saving new snapshots to permanent sqlite3 database')
8✔
1402
        # with self.temp_lock:
1403
        #     self.temp_db.commit()
1404
        # with self.lock:
1405
        #     self._execute('ATTACH DATABASE ? AS temp_db', (str(self.temp_filename),))
1406
        #     self._execute('INSERT INTO webchanges SELECT * FROM temp_db.webchanges')
1407
        #     logger.debug(f'Wrote {self.cur.rowcount} new snapshots to permanent sqlite3 database')
1408
        #     self.db.commit()
1409
        #     self._execute('DETACH DATABASE temp_db')
1410
        with self.temp_lock:
8✔
1411
            with self.lock:
8✔
1412
                for row in self._temp_execute('SELECT * FROM webchanges').fetchall():
8✔
1413
                    self._execute('INSERT INTO webchanges VALUES (?, ?, ?)', row)
8✔
1414
                self.db.commit()
8✔
1415
            if delete:
8✔
1416
                self._temp_execute('DELETE FROM webchanges')
8✔
1417

1418
    def close(self) -> None:
8✔
1419
        """Writes the temporary database to the permanent one, purges old entries if required, and closes all database
1420
        connections."""
1421
        self._copy_temp_to_permanent()
8✔
1422
        with self.temp_lock:
8✔
1423
            self.temp_db.close()
8✔
1424
            logger.debug('Cleaning up the permanent sqlite3 database and closing the connection')
8✔
1425
        with self.lock:
8✔
1426
            if self.max_snapshots:
8✔
1427
                num_del = self.keep_latest(self.max_snapshots)
8✔
1428
                logger.debug(
8✔
1429
                    f'Keeping no more than {self.max_snapshots} snapshots per job: purged {num_del} older entries'
1430
                )
1431
            else:
1432
                self.db.commit()
8✔
1433
            self.db.close()
8✔
1434
            logger.info(f'Closed main sqlite3 database file {self.filename}')
8✔
1435
        del self.temp_cur
8✔
1436
        del self.temp_db
8✔
1437
        del self.temp_lock
8✔
1438
        del self.cur
8✔
1439
        del self.db
8✔
1440
        del self.lock
8✔
1441

1442
    def get_guids(self) -> list[str]:
8✔
1443
        """Lists the unique 'guid's contained in the database.
1444

1445
        :returns: A list of guids.
1446
        """
1447
        with self.lock:
8✔
1448
            self.cur.row_factory = lambda cursor, row: row[0]
8✔
1449
            guids = self._execute('SELECT DISTINCT uuid FROM webchanges').fetchall()
8✔
1450
            self.cur.row_factory = None
8✔
1451
        return guids
8✔
1452

1453
    def load(self, guid: str) -> Snapshot:
8✔
1454
        """Return the most recent entry matching a 'guid'.
1455

1456
        :param guid: The guid.
1457

1458
        :returns: A tuple (data, timestamp, tries, etag)
1459
            WHERE
1460

1461
            - data is the data;
1462
            - timestamp is the timestamp;
1463
            - tries is the number of tries;
1464
            - etag is the ETag.
1465
        """
1466
        with self.lock:
8✔
1467
            row = self._execute(
8✔
1468
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC LIMIT 1',
1469
                (guid,),
1470
            ).fetchone()
1471
        if row:
8✔
1472
            msgpack_data, timestamp = row
8✔
1473
            r = msgpack.unpackb(msgpack_data)
8✔
1474
            return Snapshot(r['d'], timestamp, r['t'], r['e'], r.get('m', ''), r.get('err', {}))
8✔
1475

1476
        return Snapshot('', 0, 0, '', '', {})
8✔
1477

1478
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
8✔
1479
        """Return max 'count' (None = all) records of data and timestamp of **successful** runs for a 'guid'.
1480

1481
        :param guid: The guid.
1482
        :param count: The maximum number of entries to return; if None return all.
1483

1484
        :returns: A dict (key: value)
1485
            WHERE
1486

1487
            - key is the snapshot data;
1488
            - value is the most recent timestamp for such snapshot.
1489
        """
1490
        if count is not None and count < 1:
8✔
1491
            return {}
8✔
1492

1493
        with self.lock:
8✔
1494
            rows = self._execute(
8✔
1495
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC', (guid,)
1496
            ).fetchall()
1497
        history = {}
8✔
1498
        if rows:
8✔
1499
            for msgpack_data, timestamp in rows:
8✔
1500
                r = msgpack.unpackb(msgpack_data)
8✔
1501
                if not r['t']:  # No data is saved when errors are encountered; use get_history_snapshots()
8!
1502
                    if r['d'] not in history:
8!
1503
                        history[r['d']] = timestamp
8✔
1504
                        if count is not None and len(history) >= count:
8!
UNCOV
1505
                            break
×
1506
        return history
8✔
1507

1508
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
8✔
1509
        """Return max 'count' (None = all) entries of all data (including from error runs) saved for a 'guid'.
1510

1511
        :param guid: The guid.
1512
        :param count: The maximum number of entries to return; if None return all.
1513

1514
        :returns: A list of Snapshot tuples (data, timestamp, tries, etag).
1515
            WHERE the values are:
1516

1517
            - data: The data (str, could be empty);
1518
            - timestamp: The timestamp (float);
1519
            - tries: The number of tries (int);
1520
            - etag: The ETag (str, could be empty).
1521
        """
1522
        if count is not None and count < 1:
8✔
1523
            return []
8✔
1524

1525
        with self.lock:
8✔
1526
            rows = self._execute(
8✔
1527
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC', (guid,)
1528
            ).fetchall()
1529
        history: list[Snapshot] = []
8✔
1530
        if rows:
8✔
1531
            for msgpack_data, timestamp in rows:
8✔
1532
                r = msgpack.unpackb(msgpack_data)
8✔
1533
                history.append(Snapshot(r['d'], timestamp, r['t'], r['e'], r.get('m', ''), r.get('err', {})))
8✔
1534
                if count is not None and len(history) >= count:
8!
UNCOV
1535
                    break
×
1536
        return history
8✔
1537

1538
    def save(
8✔
1539
        self,
1540
        *args: Any,
1541
        guid: str,
1542
        snapshot: Snapshot,
1543
        temporary: bool | None = True,
1544
        **kwargs: Any,
1545
    ) -> None:
1546
        """Save the data from a job.
1547

1548
        By default, it is saved into the temporary database. Call close() to transfer the contents of the temporary
1549
        database to the permanent one.
1550

1551
        Note: the logic is such that any attempts that end in an exception will have tries >= 1, and we replace the data
1552
        with the one from the most recent successful attempt.
1553

1554
        :param guid: The guid.
1555
        :param data: The data.
1556
        :param timestamp: The timestamp.
1557
        :param tries: The number of tries.
1558
        :param etag: The ETag (could be empty string).
1559
        :param temporary: If true, saved to temporary database (default).
1560
        """
1561

1562
        c = {
8✔
1563
            'd': snapshot.data,
1564
            't': snapshot.tries,
1565
            'e': snapshot.etag,
1566
            'm': snapshot.mime_type,
1567
            'err': snapshot.error_data,
1568
        }
1569
        msgpack_data = msgpack.packb(c)
8✔
1570
        if temporary:
8✔
1571
            with self.temp_lock:
8✔
1572
                self._temp_execute('INSERT INTO webchanges VALUES (?, ?, ?)', (guid, snapshot.timestamp, msgpack_data))
8✔
1573
                # we do not commit to temporary as it's being used as write-only (we commit at the end)
1574
        else:
1575
            with self.lock:
8✔
1576
                self._execute('INSERT INTO webchanges VALUES (?, ?, ?)', (guid, snapshot.timestamp, msgpack_data))
8✔
1577
                self.db.commit()
8✔
1578

1579
    def delete(self, guid: str) -> None:
8✔
1580
        """Delete all entries matching a 'guid'.
1581

1582
        :param guid: The guid.
1583
        """
1584
        with self.lock:
8✔
1585
            self._execute('DELETE FROM webchanges WHERE uuid = ?', (guid,))
8✔
1586
            self.db.commit()
8✔
1587

1588
    def delete_latest(
8✔
1589
        self,
1590
        guid: str,
1591
        delete_entries: int = 1,
1592
        temporary: bool | None = False,
1593
        **kwargs: Any,
1594
    ) -> int:
1595
        """For the given 'guid', delete the latest 'delete_entries' number of entries and keep all other (older) ones.
1596

1597
        :param guid: The guid.
1598
        :param delete_entries: The number of most recent entries to delete.
1599
        :param temporary: If False, deleted from permanent database (default).
1600

1601
        :returns: Number of records deleted.
1602
        """
1603
        if temporary:
8✔
1604
            with self.temp_lock:
8✔
1605
                self._temp_execute(
8✔
1606
                    'DELETE FROM webchanges '
1607
                    'WHERE ROWID IN ( '
1608
                    '    SELECT ROWID FROM webchanges '
1609
                    '    WHERE uuid = ? '
1610
                    '    ORDER BY timestamp DESC '
1611
                    '    LIMIT ? '
1612
                    ')',
1613
                    (guid, delete_entries),
1614
                )
1615
                num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1616
        else:
1617
            with self.lock:
8✔
1618
                self._execute(
8✔
1619
                    'DELETE FROM webchanges '
1620
                    'WHERE ROWID IN ( '
1621
                    '    SELECT ROWID FROM webchanges '
1622
                    '    WHERE uuid = ? '
1623
                    '    ORDER BY timestamp DESC '
1624
                    '    LIMIT ? '
1625
                    ')',
1626
                    (guid, delete_entries),
1627
                )
1628
                num_del = self._execute('SELECT changes()').fetchone()[0]
8✔
1629
                self.db.commit()
8✔
1630
        return num_del
8✔
1631

1632
    def delete_all(self) -> int:
8✔
1633
        """Delete all entries; used for testing only.
1634

1635
        :returns: Number of records deleted.
1636
        """
1637
        with self.lock:
8✔
1638
            self._execute('DELETE FROM webchanges')
8✔
1639
            self.db.commit()
8✔
1640
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1641
            self.db.commit()
8✔
1642

1643
        return num_del
8✔
1644

1645
    def clean(self, guid: str, keep_entries: int = 1) -> int:
8✔
1646
        """For the given 'guid', keep only the latest 'keep_entries' number of entries and delete all other (older)
1647
        ones. To delete older entries from all guids, use clean_all() instead.
1648

1649
        :param guid: The guid.
1650
        :param keep_entries: Number of entries to keep after deletion.
1651

1652
        :returns: Number of records deleted.
1653
        """
1654
        with self.lock:
8✔
1655
            self._execute(
8✔
1656
                'DELETE FROM webchanges '
1657
                'WHERE ROWID IN ( '
1658
                '    SELECT ROWID FROM webchanges '
1659
                '    WHERE uuid = ? '
1660
                '    ORDER BY timestamp DESC '
1661
                '    LIMIT -1 '
1662
                '    OFFSET ? '
1663
                ') ',
1664
                (guid, keep_entries),
1665
            )
1666
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1667
            self.db.commit()
8✔
1668
            self._execute('VACUUM')
8✔
1669
        return num_del
8✔
1670

1671
    def move(self, guid: str, new_guid: str) -> int:
8✔
1672
        """Replace uuid in records matching the 'guid' with the 'new_guid' value.
1673

1674
        If there are existing records with 'new_guid', they will not be overwritten and the job histories will be
1675
        merged.
1676

1677
        :returns: Number of records searched for replacement.
1678
        """
1679
        total_searched = 0
8✔
1680
        if guid != new_guid:
8!
1681
            with self.lock:
8✔
1682
                self._execute(
8✔
1683
                    'UPDATE webchanges SET uuid = REPLACE(uuid, ?, ?)',
1684
                    (guid, new_guid),
1685
                )
1686
                total_searched = self._execute('SELECT changes()').fetchone()[0]
8✔
1687
                self.db.commit()
8✔
1688
                self._execute('VACUUM')
8✔
1689

1690
        return total_searched
8✔
1691

1692
    def clean_all(self, keep_entries: int = 1) -> int:
8✔
1693
        """Delete all older entries for each 'guid' (keep only keep_entries).
1694

1695
        :returns: Number of records deleted.
1696
        """
1697
        with self.lock:
8✔
1698
            if keep_entries == 1:
8✔
1699
                self._execute(
8✔
1700
                    'DELETE FROM webchanges '
1701
                    'WHERE EXISTS ( '
1702
                    '    SELECT 1 FROM webchanges '
1703
                    '    w WHERE w.uuid = webchanges.uuid AND w.timestamp > webchanges.timestamp '
1704
                    ')'
1705
                )
1706
            else:
1707
                self._execute(
8✔
1708
                    'DELETE FROM webchanges '
1709
                    'WHERE ROWID IN ( '
1710
                    '    WITH rank_added AS ('
1711
                    '        SELECT '
1712
                    '             ROWID,'
1713
                    '             uuid,'
1714
                    '             timestamp, '
1715
                    '             ROW_NUMBER() OVER (PARTITION BY uuid ORDER BY timestamp DESC) AS rn'
1716
                    '        FROM webchanges '
1717
                    '    ) '
1718
                    '    SELECT ROWID FROM rank_added '
1719
                    '    WHERE rn > ?'
1720
                    ')',
1721
                    (keep_entries,),
1722
                )
1723
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1724
            self.db.commit()
8✔
1725
            self._execute('VACUUM')
8✔
1726
        return num_del
8✔
1727

1728
    def keep_latest(self, keep_entries: int = 1) -> int:
8✔
1729
        """Delete all older entries keeping only the 'keep_num' per guid.
1730

1731
        :param keep_entries: Number of entries to keep after deletion.
1732

1733
        :returns: Number of records deleted.
1734
        """
1735
        with self.lock:
8✔
1736
            self._execute(
8✔
1737
                'WITH '
1738
                'cte AS ( SELECT uuid, timestamp, ROW_NUMBER() OVER ( PARTITION BY uuid '
1739
                '                                                     ORDER BY timestamp DESC ) rn '
1740
                '         FROM webchanges ) '
1741
                'DELETE '
1742
                'FROM webchanges '
1743
                'WHERE EXISTS ( SELECT 1 '
1744
                '               FROM cte '
1745
                '               WHERE webchanges.uuid = cte.uuid '
1746
                '                 AND webchanges.timestamp = cte.timestamp '
1747
                '                 AND cte.rn > ? );',
1748
                (keep_entries,),
1749
            )
1750
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1751
            self.db.commit()
8✔
1752
        return num_del
8✔
1753

1754
    def rollback(self, timestamp: float, count: bool = False) -> int:
8✔
1755
        """Rollback database to the entries present at timestamp.
1756

1757
        :param timestamp: The timestamp.
1758
        :param count: If set to true, only count the number that would be deleted without doing so.
1759

1760
        :returns: Number of records deleted (or to be deleted).
1761
        """
1762
        command = 'SELECT COUNT(*)' if count else 'DELETE'
8✔
1763
        with self.lock:
8✔
1764
            self._execute(
8✔
1765
                f'{command} '  # noqa: S608 Possible SQL injection
1766
                'FROM webchanges '
1767
                'WHERE EXISTS ( '
1768
                '     SELECT 1 '
1769
                '     FROM webchanges AS w '
1770
                '     WHERE w.uuid = webchanges.uuid '
1771
                '     AND webchanges.timestamp > ? '
1772
                '     AND w.timestamp > ? '
1773
                ')',
1774
                (timestamp, timestamp),
1775
            )
1776
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1777
            self.db.commit()
8✔
1778
        return num_del
8✔
1779

1780
    def migrate_from_minidb(self, minidb_filename: str | Path) -> None:
8✔
1781
        """Migrate the data of a legacy minidb database to the current database.
1782

1783
        :param minidb_filename: The filename of the legacy minidb database.
1784
        """
1785

1786
        print("Found 'minidb' database and upgrading it to the new engine (note: only the last snapshot is retained).")
8✔
1787
        logger.info(
8✔
1788
            "Found legacy 'minidb' database and converting it to 'sqlite3' and new schema. Package 'minidb' needs to be"
1789
            ' installed for the conversion.'
1790
        )
1791

1792
        from webchanges.storage_minidb import SsdbMiniDBStorage
8✔
1793

1794
        legacy_db = SsdbMiniDBStorage(minidb_filename)
8✔
1795
        self.restore(legacy_db.backup())
8✔
1796
        legacy_db.close()
8✔
1797
        print(f'Database upgrade finished; the following backup file can be safely deleted: {minidb_filename}.\n')
8✔
1798
        print("The 'minidb' package can be removed (unless used by another program): $ pip uninstall minidb.")
8✔
1799
        print('-' * 80)
8✔
1800

1801
    def flushdb(self) -> None:
8✔
1802
        """Delete all entries of the database.  Use with care, there is no undo!"""
1803
        with self.lock:
8✔
1804
            self._execute('DELETE FROM webchanges')
8✔
1805
            self.db.commit()
8✔
1806

1807

1808
class SsdbRedisStorage(SsdbStorage):
8✔
1809
    """Class for storing snapshots using redis."""
1810

1811
    def __init__(self, filename: str | Path) -> None:
8✔
1812
        super().__init__(filename)
4✔
1813

1814
        if isinstance(redis, str):
4!
UNCOV
1815
            raise ImportError(f"Python package 'redis' cannot be imported.\n{redis}")
×
1816

1817
        self.db = redis.from_url(str(filename))
4✔
1818
        logger.info(f'Using {self.filename} for database')
4✔
1819

1820
    @staticmethod
8✔
1821
    def _make_key(guid: str) -> str:
8✔
1822
        return 'guid:' + guid
4✔
1823

1824
    def close(self) -> None:
8✔
UNCOV
1825
        self.db.connection_pool.disconnect()
×
UNCOV
1826
        del self.db
×
1827

1828
    def get_guids(self) -> list[str]:
8✔
1829
        guids = []
4✔
1830
        for guid in self.db.keys('guid:*'):
4✔
1831
            guids.append(guid[5:].decode())
4✔
1832
        return guids
4✔
1833

1834
    def load(self, guid: str) -> Snapshot:
8✔
1835
        key = self._make_key(guid)
4✔
1836
        data = self.db.lindex(key, 0)
4✔
1837

1838
        if data:
4✔
1839
            r = msgpack.unpackb(data)
4✔
1840
            return Snapshot(
4✔
1841
                r['data'], r['timestamp'], r['tries'], r['etag'], r.get('mime_type', ''), r.get('err_data', {})
1842
            )
1843

1844
        return Snapshot('', 0, 0, '', '', {})
4✔
1845

1846
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
8✔
1847
        if count is not None and count < 1:
4✔
1848
            return {}
4✔
1849

1850
        history = {}
4✔
1851
        key = self._make_key(guid)
4✔
1852
        for i in range(0, self.db.llen(key)):
4✔
1853
            r = self.db.lindex(key, i)
4✔
1854
            c = msgpack.unpackb(r)
4✔
1855
            if c['tries'] == 0 or c['tries'] is None:
4!
1856
                if c['data'] not in history:
4!
1857
                    history[c['data']] = c['timestamp']
4✔
1858
                    if count is not None and len(history) >= count:
4!
UNCOV
1859
                        break
×
1860
        return history
4✔
1861

1862
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
8✔
1863
        if count is not None and count < 1:
4✔
1864
            return []
4✔
1865

1866
        history: list[Snapshot] = []
4✔
1867
        key = self._make_key(guid)
4✔
1868
        for i in range(0, self.db.llen(key)):
4✔
1869
            r = self.db.lindex(key, i)
4✔
1870
            c = msgpack.unpackb(r)
4✔
1871
            if c['tries'] == 0 or c['tries'] is None:
4!
1872
                history.append(
4✔
1873
                    Snapshot(
1874
                        c['data'],
1875
                        c['timestamp'],
1876
                        c['tries'],
1877
                        c['etag'],
1878
                        c.get('mime_type', ''),
1879
                        c.get('error_data', {}),
1880
                    )
1881
                )
1882
                if count is not None and len(history) >= count:
4!
UNCOV
1883
                    break
×
1884
        return history
4✔
1885

1886
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
8✔
1887
        r = {
4✔
1888
            'data': snapshot.data,
1889
            'timestamp': snapshot.timestamp,
1890
            'tries': snapshot.tries,
1891
            'etag': snapshot.etag,
1892
            'mime_type': snapshot.mime_type,
1893
            'error_data': snapshot.error_data,
1894
        }
1895
        packed_data = msgpack.packb(r)
4✔
1896
        if packed_data:
4!
1897
            self.db.lpush(self._make_key(guid), packed_data)
4✔
1898

1899
    def delete(self, guid: str) -> None:
8✔
1900
        self.db.delete(self._make_key(guid))
4✔
1901

1902
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
8✔
1903
        """For the given 'guid', delete the latest 'delete_entries' entry and keep all other (older) ones.
1904

1905
        :param guid: The guid.
1906
        :param delete_entries: The number of most recent entries to delete (only 1 is supported by this Redis code).
1907

1908
        :returns: Number of records deleted.
1909
        """
1910
        if delete_entries != 1:
4✔
1911
            raise NotImplementedError('Only deleting of the latest 1 entry is supported by this Redis code.')
1912

1913
        if self.db.lpop(self._make_key(guid)) is None:
4!
UNCOV
1914
            return 0
×
1915

1916
        return 1
4✔
1917

1918
    def delete_all(self) -> int:
8✔
1919
        """Delete all entries; used for testing only.
1920

1921
        :returns: Number of records deleted.
1922
        """
1923
        raise NotImplementedError('This method is not implemented for Redis.')
1924

1925
    def clean(self, guid: str, keep_entries: int = 1) -> int:
8✔
1926
        if keep_entries != 1:
4✔
1927
            raise NotImplementedError('Only keeping latest 1 entry is supported.')
1928

1929
        key = self._make_key(guid)
4✔
1930
        i = self.db.llen(key)
4✔
1931
        if self.db.ltrim(key, 0, 0):
4!
1932
            return i - self.db.llen(key)
4✔
1933

UNCOV
1934
        return 0
×
1935

1936
    def move(self, guid: str, new_guid: str) -> int:
8✔
1937
        if guid == new_guid:
4!
1938
            return 0
×
1939
        key = self._make_key(guid)
4✔
1940
        new_key = self._make_key(new_guid)
4✔
1941
        # Note if a list with 'new_key' already exists, the data stored there
1942
        # will be overwritten.
1943
        self.db.rename(key, new_key)  # type: ignore[no-untyped-call]
4✔
1944
        return self.db.llen(new_key)
4✔
1945

1946
    def rollback(self, timestamp: float) -> None:
8✔
1947
        raise NotImplementedError("Rolling back the database is not supported by 'redis' database engine")
1948

1949
    def flushdb(self) -> None:
8✔
1950
        """Delete all entries of the database.  Use with care, there is no undo!"""
1951
        self.db.flushdb()
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc