• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 21856489627

10 Feb 2026 07:57AM UTC coverage: 73.228% (-0.09%) from 73.318%
21856489627

push

github

mborsetti
Version 3.34.0rc0

1424 of 2298 branches covered (61.97%)

Branch coverage included in aggregate %.

4766 of 6155 relevant lines covered (77.43%)

11.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.74
/webchanges/storage.py
1
"""Handles all storage: jobs files, config files, hooks file, and cache database engines."""
2

3
# The code below is subject to the license contained in the LICENSE.md file, which is part of the source code.
4

5
from __future__ import annotations
15✔
6

7
import copy
15✔
8
import inspect
15✔
9
import io
15✔
10
import logging
15✔
11
import os
15✔
12
import shutil
15✔
13
import sqlite3
15✔
14
import sys
15✔
15
import threading
15✔
16
import warnings
15✔
17
from abc import ABC, abstractmethod
15✔
18
from collections import defaultdict
15✔
19
from dataclasses import dataclass
15✔
20
from datetime import datetime, timezone
15✔
21
from pathlib import Path
15✔
22
from types import NoneType
15✔
23
from typing import Any, Iterable, Iterator, Literal, TextIO, TypedDict
15✔
24

25
import msgpack
15✔
26
import yaml
15✔
27
import yaml.scanner
15✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
15✔
30
from webchanges.filters import FilterBase
15✔
31
from webchanges.handler import ErrorData, Snapshot
15✔
32
from webchanges.jobs import JobBase, ShellJob
15✔
33
from webchanges.reporters import ReporterBase
15✔
34
from webchanges.util import edit_file, file_ownership_checks
15✔
35

36
try:
15✔
37
    from httpx import Headers
15✔
38
except ImportError:  # pragma: no cover
39
    from webchanges._vendored.headers import Headers
40

41
try:
15✔
42
    import redis
15✔
43
except ImportError as e:  # pragma: no cover
44
    redis = str(e)  # type: ignore[assignment]
45

46
logger = logging.getLogger(__name__)
15✔
47

48
_ConfigDisplay = TypedDict(
15✔
49
    '_ConfigDisplay',
50
    {
51
        'new': bool,
52
        'error': bool,
53
        'unchanged': bool,
54
        'empty-diff': bool,
55
    },
56
)
57
_ConfigReportText = TypedDict(
15✔
58
    '_ConfigReportText',
59
    {
60
        'line_length': int,
61
        'details': bool,
62
        'footer': bool,
63
        'minimal': bool,
64
        'separate': bool,
65
    },
66
)
67
_ConfigReportHtml = TypedDict(
15✔
68
    '_ConfigReportHtml',
69
    {
70
        'diff': Literal['unified', 'table'],
71
        'footer': bool,
72
        'separate': bool,
73
        'title': str,
74
    },
75
)
76
_ConfigReportMarkdown = TypedDict(
15✔
77
    '_ConfigReportMarkdown',
78
    {
79
        'details': bool,
80
        'footer': bool,
81
        'minimal': bool,
82
        'separate': bool,
83
    },
84
)
85
_ConfigReportStdout = TypedDict(
15✔
86
    '_ConfigReportStdout',
87
    {
88
        'enabled': bool,
89
        'color': bool | Literal['normal', 'bright'],
90
    },
91
)
92
_ConfigReportBrowser = TypedDict(
15✔
93
    '_ConfigReportBrowser',
94
    {
95
        'enabled': bool,
96
    },
97
)
98
_ConfigReportDiscord = TypedDict(
15✔
99
    '_ConfigReportDiscord',
100
    {
101
        'enabled': bool,
102
        'webhook_url': str,
103
        'embed': bool,
104
        'subject': str,
105
        'colored': bool,
106
        'max_message_length': int | None,
107
    },
108
)
109
_ConfigReportEmailSmtp = TypedDict(
15✔
110
    '_ConfigReportEmailSmtp',
111
    {
112
        'host': str,
113
        'user': str,
114
        'port': int,
115
        'starttls': bool,
116
        'auth': bool,
117
        'insecure_password': str,
118
        'utf-8': bool,
119
    },
120
)
121
_ConfigReportEmailSendmail = TypedDict(
15✔
122
    '_ConfigReportEmailSendmail',
123
    {
124
        'path': str | Path,
125
    },
126
)
127
_ConfigReportEmail = TypedDict(
15✔
128
    '_ConfigReportEmail',
129
    {
130
        'enabled': bool,
131
        'html': bool,
132
        'to': str,
133
        'from': str,
134
        'subject': str,
135
        'method': Literal['sendmail', 'smtp'],
136
        'smtp': _ConfigReportEmailSmtp,
137
        'sendmail': _ConfigReportEmailSendmail,
138
    },
139
)
140
_ConfigReportGithubIssue = TypedDict(
15✔
141
    '_ConfigReportGithubIssue',
142
    {
143
        'enabled': bool,
144
        'token': str,
145
        'owner': str,
146
        'repo': str,
147
        'title': str,
148
        'labels': list[str],
149
        'format_dt': str,
150
        'format_content': str,
151
        'assignees': list[str],
152
        'type': str,
153
        'milestone': str,
154
    },
155
)
156
_ConfigReportGotify = TypedDict(
15✔
157
    '_ConfigReportGotify',
158
    {
159
        'enabled': bool,
160
        'priority': int,
161
        'server_url': str,
162
        'title': str,
163
        'token': str,
164
    },
165
)
166
_ConfigReportIfttt = TypedDict(
15✔
167
    '_ConfigReportIfttt',
168
    {
169
        'enabled': bool,
170
        'key': str,
171
        'event': str,
172
    },
173
)
174
_ConfigReportMailgun = TypedDict(
15✔
175
    '_ConfigReportMailgun',
176
    {
177
        'enabled': bool,
178
        'region': str,
179
        'api_key': str,
180
        'domain': str,
181
        'from_mail': str,
182
        'from_name': str,
183
        'to': str,
184
        'subject': str,
185
    },
186
)
187
_ConfigReportMatrix = TypedDict(
15✔
188
    '_ConfigReportMatrix',
189
    {
190
        'enabled': bool,
191
        'homeserver': str,
192
        'access_token': str,
193
        'room_id': str,
194
    },
195
)
196
_ConfigReportNtfyPriorities = TypedDict(
15✔
197
    '_ConfigReportNtfyPriorities',
198
    {
199
        'default': int | str,
200
        'new': int | str,
201
        'changed': int | str,
202
        'error': int | str,
203
    },
204
)
205
_ConfigReportNtfy = TypedDict(
15✔
206
    '_ConfigReportNtfy',
207
    {
208
        'enabled': bool,
209
        'topic_url': str,
210
        'authorization': str | None,
211
        'priorities': _ConfigReportNtfyPriorities,
212
    },
213
)
214
_ConfigReportProwl = TypedDict(
15✔
215
    '_ConfigReportProwl',
216
    {
217
        'enabled': bool,
218
        'api_key': str,
219
        'priority': int,
220
        'application': str,
221
        'subject': str,
222
    },
223
)
224
_ConfigReportPushbullet = TypedDict(
15✔
225
    '_ConfigReportPushbullet',
226
    {
227
        'enabled': bool,
228
        'api_key': str,
229
    },
230
)
231
_ConfigReportPushover = TypedDict(
15✔
232
    '_ConfigReportPushover',
233
    {
234
        'enabled': bool,
235
        'app': str,
236
        'device': str | None,
237
        'sound': str,
238
        'user': str,
239
        'priority': str,
240
    },
241
)
242
_ConfigReportRunCommand = TypedDict(
15✔
243
    '_ConfigReportRunCommand',
244
    {
245
        'enabled': bool,
246
        'command': str,
247
    },
248
)
249
_ConfigReportTelegram = TypedDict(
15✔
250
    '_ConfigReportTelegram',
251
    {
252
        'enabled': bool,
253
        'bot_token': str,
254
        'chat_id': str | int | list[str | int],
255
        'silent': bool,
256
    },
257
)
258
_ConfigReportWebhook = TypedDict(
15✔
259
    '_ConfigReportWebhook',
260
    {
261
        'enabled': bool,
262
        'markdown': bool,
263
        'webhook_url': str,
264
        'rich_text': bool | None,
265
        'max_message_length': int | None,
266
    },
267
)
268
_ConfigReportXmpp = TypedDict(
15✔
269
    '_ConfigReportXmpp',
270
    {
271
        'enabled': bool,
272
        'sender': str,
273
        'recipient': str,
274
        'insecure_password': str | None,
275
    },
276
)
277

278
_ConfigReport = TypedDict(
15✔
279
    '_ConfigReport',
280
    {
281
        'tz': str | None,
282
        'text': _ConfigReportText,
283
        'html': _ConfigReportHtml,
284
        'markdown': _ConfigReportMarkdown,
285
        'stdout': _ConfigReportStdout,
286
        'browser': _ConfigReportBrowser,
287
        'discord': _ConfigReportDiscord,
288
        'email': _ConfigReportEmail,
289
        'github_issue': _ConfigReportGithubIssue,
290
        'gotify': _ConfigReportGotify,
291
        'ifttt': _ConfigReportIfttt,
292
        'mailgun': _ConfigReportMailgun,
293
        'matrix': _ConfigReportMatrix,
294
        'ntfy': _ConfigReportNtfy,
295
        'prowl': _ConfigReportProwl,
296
        'pushbullet': _ConfigReportPushbullet,
297
        'pushover': _ConfigReportPushover,
298
        'run_command': _ConfigReportRunCommand,
299
        'telegram': _ConfigReportTelegram,
300
        'webhook': _ConfigReportWebhook,
301
        'xmpp': _ConfigReportXmpp,
302
    },
303
)
304
_ConfigJobDefaults = TypedDict(
15✔
305
    '_ConfigJobDefaults',
306
    {
307
        '_note': str,
308
        'all': dict[str, Any],
309
        'url': dict[str, Any],
310
        'browser': dict[str, Any],
311
        'command': dict[str, Any],
312
    },
313
    total=False,
314
)
315
_ConfigDifferDefaults = TypedDict(
15✔
316
    '_ConfigDifferDefaults',
317
    {
318
        '_note': str,
319
        'unified': dict[str, Any],
320
        'ai_google': dict[str, Any],
321
        'command': dict[str, Any],
322
        'deepdiff': dict[str, Any],
323
        'image': dict[str, Any],
324
        'table': dict[str, Any],
325
        'wdiff': dict[str, Any],
326
    },
327
    total=False,
328
)
329
_ConfigDatabase = TypedDict(
15✔
330
    '_ConfigDatabase',
331
    {
332
        'engine': Literal['sqlite3', 'redis', 'minidb', 'textfiles'] | str,  # noqa: PYI051
333
        'max_snapshots': int,
334
    },
335
)
336
_Config = TypedDict(
15✔
337
    '_Config',
338
    {
339
        'display': _ConfigDisplay,
340
        'report': _ConfigReport,
341
        'job_defaults': _ConfigJobDefaults,
342
        'differ_defaults': _ConfigDifferDefaults,
343
        'database': _ConfigDatabase,
344
        'footnote': str | None,
345
    },
346
)
347

348
DEFAULT_CONFIG: _Config = {
15✔
349
    'display': {  # select whether the report include the categories below
350
        'new': True,
351
        'error': True,
352
        'unchanged': False,
353
        'empty-diff': False,
354
    },
355
    'report': {
356
        'tz': None,  # the timezone as a IANA time zone name, e.g. 'America/Los_Angeles', or null for machine's
357
        # the directives below are for the report content types (text, html or markdown)
358
        'text': {
359
            'details': True,  # whether the diff is sent
360
            'footer': True,
361
            'line_length': 75,
362
            'minimal': False,
363
            'separate': False,
364
        },
365
        'html': {
366
            'diff': 'unified',  # 'unified' or 'table'
367
            'footer': True,
368
            'separate': False,
369
            'title': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
370
        },
371
        'markdown': {
372
            'details': True,  # whether the diff is sent
373
            'footer': True,
374
            'minimal': False,
375
            'separate': False,
376
        },
377
        # the directives below control 'reporters', i.e. where a report is displayed and/or sent
378
        'stdout': {  # the console / command line display; uses text
379
            'enabled': True,
380
            'color': True,
381
        },
382
        'browser': {  # the system's default browser; uses html
383
            'enabled': False,
384
        },
385
        'discord': {
386
            'enabled': False,
387
            'webhook_url': '',
388
            'embed': True,
389
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
390
            'colored': True,
391
            'max_message_length': None,
392
        },
393
        'email': {  # email (except mailgun); uses text or both html and text if 'html' is set to true
394
            'enabled': False,
395
            'html': True,
396
            'from': '',
397
            'to': '',
398
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
399
            'method': 'smtp',  # either 'smtp' or 'sendmail'
400
            'smtp': {
401
                'host': 'localhost',
402
                'port': 587,
403
                'starttls': True,
404
                'auth': True,
405
                'user': '',
406
                'insecure_password': '',
407
                'utf-8': True,
408
            },
409
            'sendmail': {
410
                'path': 'sendmail',
411
            },
412
        },
413
        'github_issue': {
414
            'enabled': False,
415
            'token': '',
416
            'owner': '',
417
            'repo': '',
418
            'title': '',
419
            'labels': [],
420
            'format_dt': '',
421
            'format_content': '',
422
            'assignees': [],
423
            'type': '',
424
            'milestone': '',
425
        },
426
        'gotify': {  # uses markdown
427
            'enabled': False,
428
            'priority': 0,
429
            'server_url': '',
430
            'title': '',
431
            'token': '',
432
        },
433
        'ifttt': {  # uses text
434
            'enabled': False,
435
            'key': '',
436
            'event': '',
437
        },
438
        'mailgun': {  # uses text
439
            'enabled': False,
440
            'region': 'us',
441
            'api_key': '',
442
            'domain': '',
443
            'from_mail': '',
444
            'from_name': '',
445
            'to': '',
446
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
447
        },
448
        'matrix': {  # uses text
449
            'enabled': False,
450
            'homeserver': '',
451
            'access_token': '',
452
            'room_id': '',
453
        },
454
        'ntfy': {  # uses text
455
            'enabled': False,
456
            'topic_url': '',
457
            'authorization': None,
458
            'priorities': {
459
                'default': 'default',
460
                'new': 'low',
461
                'changed': 'max',
462
                'error': 'high',
463
            },
464
        },
465
        'prowl': {  # uses text
466
            'enabled': False,
467
            'api_key': '',
468
            'priority': 0,
469
            'application': '',
470
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
471
        },
472
        'pushbullet': {  # uses text
473
            'enabled': False,
474
            'api_key': '',
475
        },
476
        'pushover': {  # uses text
477
            'enabled': False,
478
            'app': '',
479
            'user': '',
480
            'device': None,
481
            'sound': 'spacealarm',
482
            'priority': 'normal',
483
        },
484
        'run_command': {  # uses text
485
            'enabled': False,
486
            'command': '',
487
        },
488
        'telegram': {  # uses markdown (from version 3.7)
489
            'enabled': False,
490
            'bot_token': '',
491
            'chat_id': '',
492
            'silent': False,
493
        },
494
        'webhook': {
495
            'enabled': False,
496
            'webhook_url': '',
497
            'markdown': False,
498
            'rich_text': None,
499
            'max_message_length': None,
500
        },
501
        'xmpp': {  # uses text
502
            'enabled': False,
503
            'sender': '',
504
            'recipient': '',
505
            'insecure_password': '',
506
        },
507
    },
508
    'job_defaults': {
509
        '_note': 'Default directives that are applied to jobs.',
510
        'all': {'_note': 'These are used for all type of jobs, including those in hooks.py.'},
511
        'url': {'_note': "These are used for 'url' jobs without 'use_browser'."},
512
        'browser': {'_note': "These are used for 'url' jobs with 'use_browser: true'."},
513
        'command': {'_note': "These are used for 'command' jobs."},
514
    },
515
    'differ_defaults': {
516
        '_note': 'Default directives that are applied to individual differs.',
517
        'unified': {},
518
        'ai_google': {},
519
        'command': {},
520
        'deepdiff': {},
521
        'image': {},
522
        'table': {},
523
        'wdiff': {},
524
    },
525
    'database': {
526
        'engine': 'sqlite3',
527
        'max_snapshots': 4,
528
    },
529
    'footnote': None,
530
}
531

532

533
# Custom YAML constructor for !include
534
def yaml_include(loader: yaml.SafeLoader, node: yaml.Node) -> list[Any]:
15✔
535
    file_path = Path(loader.name).parent.joinpath(node.value)
×
536
    with file_path.open('r') as f:
×
537
        return list(yaml.safe_load_all(f))
×
538

539

540
# Add the custom constructor to the YAML loader
541
yaml.add_constructor('!include', yaml_include, Loader=yaml.SafeLoader)
15✔
542

543

544
@dataclass
15✔
545
class BaseStorage(ABC):  # noqa:  B024 abstract base class, but it has no abstract methods or properties
15✔
546
    """Base class for storage."""
547

548

549
class BaseFileStorage(BaseStorage, ABC):
15✔
550
    """Base class for file storage."""
551

552
    def __init__(self, filename: str | Path) -> None:
15✔
553
        """:param filename: The filename or directory name to storage."""
554
        if isinstance(filename, str):
15✔
555
            self.filename = Path(filename)
15✔
556
        else:
557
            self.filename = filename
15✔
558

559

560
class BaseTextualFileStorage(BaseFileStorage, ABC):
15✔
561
    """Base class for textual files."""
562

563
    def __init__(self, filename: str | Path) -> None:
15✔
564
        """:param filename: The filename or directory name to storage."""
565
        super().__init__(filename)
15✔
566
        # if not isinstance(self, JobsBaseFileStorage):
567
        #     self.load()
568

569
    @abstractmethod
15✔
570
    def load(self, *args: Any) -> Any:  # noqa: ANN401 Dynamically typed expressions (typing.Any) are disallowed
15✔
571
        """Load from storage.
572

573
        :param args: Specified by the subclass.
574
        :return: Specified by the subclass.
575
        """
576

577
    @abstractmethod
15✔
578
    def save(self, *args: Any, **kwargs: Any) -> Any:  # noqa: ANN401 Dynamically typed expressions (typing.Any) are disallowed
15✔
579
        """Save to storage.
580

581
        :param args: Specified by the subclass.
582
        :param kwargs: Specified by the subclass.
583
        :return: Specified by the subclass.
584
        """
585

586
    @classmethod
15✔
587
    @abstractmethod
15✔
588
    def parse(cls, filename: Path) -> Any:  # noqa: ANN401 Dynamically typed expressions (typing.Any) are disallowed
15✔
589
        """Parse storage contents.
590

591
        :param filename: The filename.
592
        :return: Specified by the subclass.
593
        """
594

595
    def edit(self) -> int:
15✔
596
        """Edit file.
597

598
        :returns: None if edit is successful, 1 otherwise.
599
        """
600
        # Similar code to UrlwatchCommand.edit_hooks()
601
        logger.debug(f'Edit file {self.filename}')
15✔
602
        if isinstance(self.filename, list):
15✔
603
            if len(self.filename) > 1:
15!
604
                raise ValueError(f'Only one jobs file can be specified for editing; found {len(self.filename)}.')
×
605
            filename = self.filename[0]
15✔
606
        else:
607
            filename = self.filename
15✔
608
        file_edit = filename.with_stem(filename.stem + '_edit')
15✔
609

610
        if filename.is_file():
15!
611
            shutil.copy(filename, file_edit)
15✔
612
        # elif example_file is not None and Path(example_file).is_file():
613
        #     shutil.copy(example_file, file_edit, follow_symlinks=False)
614

615
        while True:
15✔
616
            try:
15✔
617
                edit_file(file_edit)
15✔
618
                # Check if we can still parse it
619
                if self.parse is not None:
15!
620
                    self.parse(file_edit)
15✔
621
                break  # stop if no exception on parser
15✔
622
            except SystemExit:
15✔
623
                raise
×
624
            except Exception as e:  # noqa: BLE001 Do not catch blind exception: `Exception`
15✔
625
                print()
15✔
626
                print('Errors in updating file:')
15✔
627
                print('======')
15✔
628
                print(e)
15✔
629
                print('======')
15✔
630
                print()
15✔
631
                print(f'The file {filename} was NOT updated.')
15✔
632
                user_input = input('Do you want to retry the same edit? [Y/n] ')
15✔
633
                if not user_input or user_input.lower().startswith('y'):
×
634
                    continue
×
635
                file_edit.unlink()
×
636
                print('No changes have been saved.')
×
637
                return 1
×
638

639
        if filename.is_symlink():
15!
640
            filename.write_text(file_edit.read_text())
×
641
        else:
642
            file_edit.replace(filename)
15✔
643
        file_edit.unlink(missing_ok=True)
15✔
644
        print('Saved edits in', filename)
15✔
645
        return 0
15✔
646

647

648
class JobsBaseFileStorage(BaseTextualFileStorage, ABC):
15✔
649
    """Class for jobs textual files storage."""
650

651
    filename: list[Path]
15✔
652

653
    def __init__(self, filename: list[Path]) -> None:
15✔
654
        """Class for jobs textual files storage.
655

656
        :param filename: The filenames of the jobs file.
657
        """
658
        super().__init__(filename)  # type: ignore[arg-type]
15✔
659
        self.filename = filename
15✔
660

661
    def load_secure(self) -> list[JobBase]:
15✔
662
        """Load the jobs from a text file checking that the file is secure (i.e. belongs to the current UID and only
663
        the owner can write to it - Linux only).
664

665
        :return: List of JobBase objects.
666
        """
667
        jobs: list[JobBase] = self.load()
15✔
668

669
        def is_shell_job(job: JobBase) -> bool:
15✔
670
            """Check if the job uses filter 'shellpipe' or an external differ, as they call
671
            subprocess.run(shell=True) (insecure).
672

673
            :returns: True if subprocess.run(shell=True) is invoked by job, False otherwise.
674
            """
675
            if isinstance(job, ShellJob):
10✔
676
                return True
10✔
677

678
            for filter_kind, _ in FilterBase.normalize_filter_list(job.filters, job.index_number):
10!
679
                if filter_kind == 'shellpipe':
×
680
                    return True
×
681

682
                if job.differ and job.differ.get('name') == 'command':
×
683
                    return True
×
684

685
            return False
10✔
686

687
        shelljob_errors = []
15✔
688
        for file in self.filename:
15✔
689
            shelljob_errors.extend(file_ownership_checks(file))
15✔
690
        removed_jobs = (job for job in jobs if is_shell_job(job))
15✔
691
        if shelljob_errors and any(removed_jobs):
15✔
692
            print(
10✔
693
                f'ERROR: Removing the following jobs because '
694
                f' {" and ".join(shelljob_errors)}: {" ,".join(str(job.index_number) for job in removed_jobs)}\n'
695
                f'(see {__docs_url__}en/stable/jobs.html#important-note-for-command-jobs).'
696
            )
697
            jobs = [job for job in jobs if job not in removed_jobs]
10✔
698

699
        logger.info(f'Loaded {len(jobs)} jobs from {", ".join(str(file) for file in self.filename)}.')
15✔
700
        return jobs
15✔
701

702

703
class BaseYamlFileStorage(BaseTextualFileStorage, ABC):
15✔
704
    """Base class for YAML textual files storage."""
705

706
    @classmethod
15✔
707
    def parse(cls, filename: Path) -> Any:  # noqa: ANN401 Dynamically typed expressions (typing.Any) are disallowed
15✔
708
        """Return contents of YAML file if it exists
709

710
        :param filename: The filename Path.
711
        :return: Specified by the subclass.
712
        """
713
        if filename is not None and filename.is_file():
15✔
714
            with filename.open() as fp:
15✔
715
                return yaml.safe_load(fp)
15✔
716
        return None
15✔
717

718

719
class YamlConfigStorage(BaseYamlFileStorage):
15✔
720
    """Class for configuration file (is a YAML textual file)."""
721

722
    config: _Config = {}
15✔
723

724
    @staticmethod
15✔
725
    def dict_deep_difference(d1: _Config, d2: _Config, ignore_underline_keys: bool = False) -> _Config:
15✔
726
        """Recursively find elements in the first dict that are not in the second.
727

728
        :param d1: The first dict.
729
        :param d2: The second dict.
730
        :param ignore_underline_keys: If true, keys starting with _ are ignored (treated as remarks)
731
        :return: A dict with all the elements on the first dict that are not in the second.
732
        """
733

734
        def _sub_dict_deep_difference(d1_: _Config, d2_: _Config) -> _Config:
15✔
735
            """Recursive sub-function to find elements in the first dict that are not in the second.
736

737
            :param d1_: The first dict.
738
            :param d2_: The second dict.
739
            :return: A dict with elements on the first dict that are not in the second.
740
            """
741
            for key, value in d1_.copy().items():
15✔
742
                if ignore_underline_keys and key.startswith('_'):
15✔
743
                    d1_.pop(key, None)
15✔
744
                elif isinstance(value, dict) and isinstance(d2_.get(key), dict):
15✔
745
                    _sub_dict_deep_difference(value, d2_[key])
15✔
746
                    if not len(value):
15✔
747
                        d1_.pop(key)
15✔
748
                elif key in d2_:
15✔
749
                    d1_.pop(key)
15✔
750
            return d1_
15✔
751

752
        return _sub_dict_deep_difference(copy.deepcopy(d1), d2)
15✔
753

754
    @staticmethod
15✔
755
    def dict_deep_merge(source: _Config, destination: _Config) -> _Config:
15✔
756
        """Recursively deep merges source dict into destination dict.
757

758
        :param source: The first dict.
759
        :param destination: The second dict.
760
        :return: The deep merged dict.
761
        """
762
        # https://stackoverflow.com/a/20666342
763

764
        def _sub_dict_deep_merge(source_: _Config, destination_: _Config) -> _Config:
15✔
765
            """Recursive sub-function to merges source_ dict into destination_ dict.
766

767
            :param source_: The first dict.
768
            :param destination_: The second dict.
769
            :return: The merged dict.
770
            """
771
            for key, value in source_.items():
15✔
772
                if isinstance(value, dict):
15✔
773
                    # get node or create one
774
                    node = destination_.setdefault(key, {})
15✔
775
                    _sub_dict_deep_merge(value, node)
15✔
776
                else:
777
                    destination_[key] = value
15✔
778

779
            return destination_
15✔
780

781
        return _sub_dict_deep_merge(source, copy.deepcopy(destination))
15✔
782

783
    def check_for_unrecognized_keys(self, config: _Config) -> None:
15✔
784
        """Test if config has keys not in DEFAULT_CONFIG (bad keys, e.g. typos); if so, raise ValueError.
785

786
        :param config: The configuration.
787
        :raises ValueError: If the configuration has keys not in DEFAULT_CONFIG (bad keys, e.g. typos)
788
        """
789
        config_for_extras = copy.deepcopy(config)
15✔
790
        if 'job_defaults' in config_for_extras:
15!
791
            # Create missing 'job_defaults' keys from DEFAULT_CONFIG
792
            for key in DEFAULT_CONFIG['job_defaults']:
15✔
793
                if 'job_defaults' not in config_for_extras:
15!
794
                    config_for_extras['job_defaults'] = {}
×
795
                config_for_extras['job_defaults'][key] = None
15✔
796
            for key in DEFAULT_CONFIG['differ_defaults']:
15✔
797
                if 'differ_defaults' not in config_for_extras:
15!
798
                    config_for_extras['differ_defaults'] = {}
×
799
                config_for_extras['differ_defaults'][key] = None
15✔
800
        if 'hooks' in sys.modules:
15✔
801
            # Remove extra keys in config used in hooks (they are not in DEFAULT_CONFIG)
802
            for _, obj in inspect.getmembers(
15✔
803
                sys.modules['hooks'], lambda x: inspect.isclass(x) and x.__module__ == 'hooks'
804
            ):
805
                if issubclass(obj, JobBase):
15✔
806
                    if (
15!
807
                        obj.__kind__ not in DEFAULT_CONFIG['job_defaults']
808
                        or obj.__kind__ not in DEFAULT_CONFIG['job_defaults']
809
                    ):
810
                        config_for_extras['job_defaults'].pop(obj.__kind__, None)
15✔
811
                elif issubclass(obj, ReporterBase) and obj.__kind__ not in DEFAULT_CONFIG['report']:
15✔
812
                    config_for_extras['report'].pop(obj.__kind__, None)
15✔
813
        if 'slack' in config_for_extras.get('report', {}):
15✔
814
            # Ignore legacy key
815
            config_for_extras['report'].pop('slack')
15✔
816
        extras: _Config = self.dict_deep_difference(config_for_extras, DEFAULT_CONFIG, ignore_underline_keys=True)
15✔
817
        if not extras.get('report'):
15✔
818
            extras.pop('report', None)
15✔
819
        if extras:
15✔
820
            warnings.warn(
15✔
821
                f'Found unrecognized directive(s) in the configuration file {self.filename}:\n'
822
                f'{yaml.safe_dump(extras)}Check for typos or the hooks.py file (if any); documentation is at '
823
                f'{__docs_url__}\n',
824
                RuntimeWarning,
825
                stacklevel=1,
826
            )
827

828
    @staticmethod
15✔
829
    def replace_none_keys(config: _Config) -> None:
15✔
830
        """Fixes None keys in loaded config that should be empty dicts instead."""
831
        if 'job_defaults' not in config:
15!
832
            config['job_defaults'] = DEFAULT_CONFIG['job_defaults']
×
833
        else:
834
            if 'shell' in config['job_defaults']:
15!
835
                if 'command' in config['job_defaults']:
×
836
                    raise KeyError(
×
837
                        "Found both 'shell' and 'command' job_defaults in config, a duplicate. Please remove 'shell' "
838
                        'ones.'
839
                    )
840
                config['job_defaults']['command'] = config['job_defaults'].pop('shell')
×
841
            for key in ('all', 'url', 'browser', 'command'):
15✔
842
                if key not in config['job_defaults'] or config['job_defaults'][key] is None:
15!
843
                    config['job_defaults'][key] = {}
×
844

845
    def load(self, *args: Any) -> None:
15✔
846
        """Load configuration file from self.filename into self.config adding missing keys from DEFAULT_CONFIG.
847

848
        :param args: None used.
849
        """
850
        logger.debug(f'Loading configuration from {self.filename}')
15✔
851
        config: _Config = self.parse(self.filename)
15✔
852

853
        if config:
15✔
854
            self.replace_none_keys(config)
15✔
855
            self.check_for_unrecognized_keys(config)
15✔
856

857
            # If config is missing keys in DEFAULT_CONFIG, log the missing keys and deep merge DEFAULT_CONFIG
858
            missing = self.dict_deep_difference(DEFAULT_CONFIG, config, ignore_underline_keys=True)
15✔
859
            if missing:
15!
860
                logger.info(
15✔
861
                    f'The configuration file {self.filename} is missing directive(s); using default value for those. '
862
                    'Run with -vv for more detalis.'
863
                )
864
                logger.debug(
15✔
865
                    f'The following default values are being used:\n'
866
                    f'{yaml.safe_dump(missing)}'
867
                    f'See documentation at {__docs_url__}en/stable/configuration.html'
868
                )
869
                config = self.dict_deep_merge(config or {}, DEFAULT_CONFIG)
15✔
870

871
            # format headers
872
            for job_defaults_type in ('all', 'url', 'browser'):
15✔
873
                if 'headers' in config['job_defaults'][job_defaults_type]:
15!
874
                    config['job_defaults'][job_defaults_type]['headers'] = Headers(
×
875
                        {k: str(v) for k, v in config['job_defaults'][job_defaults_type]['headers'].items()},
876
                        encoding='utf-8',
877
                    )
878
                if 'cookies' in config['job_defaults'][job_defaults_type]:
15!
879
                    config['job_defaults'][job_defaults_type]['cookies'] = {
×
880
                        k: str(v) for k, v in config['job_defaults'][job_defaults_type]['cookies'].items()
881
                    }
882
            logger.info(f'Loaded configuration from {self.filename}')
15✔
883

884
        else:
885
            logger.warning(f'No directives found in the configuration file {self.filename}; using default directives.')
15✔
886
            config = DEFAULT_CONFIG
15✔
887

888
        self.config = config
15✔
889

890
    def save(self, *args: Any, **kwargs: Any) -> None:
15✔
891
        """Save self.config into self.filename using YAML.
892

893
        :param args: None used.
894
        :param kwargs: None used.
895
        """
896
        with self.filename.open('w') as fp:
15✔
897
            fp.write(
15✔
898
                f'# {__project_name__} configuration file. See {__docs_url__}en/stable/configuration.html\n'
899
                f'# Originally written on {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}Z by version'
900
                f' {__version__}.\n'
901
                f'\n'
902
            )
903
            yaml.safe_dump(self.config, fp, allow_unicode=True, sort_keys=False)
15✔
904

905
    @classmethod
15✔
906
    def write_default_config(cls, filename: Path) -> None:
15✔
907
        """Write default configuration to file.
908

909
        :param filename: The filename.
910
        """
911
        config_storage = cls(filename)
15✔
912
        config_storage.config = DEFAULT_CONFIG
15✔
913
        config_storage.save()
15✔
914

915

916
class YamlJobsStorage(BaseYamlFileStorage, JobsBaseFileStorage):
15✔
917
    """Class for jobs file (is a YAML textual file)."""
918

919
    @classmethod
15✔
920
    def _parse(cls, fp: TextIO, filenames: list[Path]) -> list[JobBase]:
15✔
921
        """Parse the contents of a jobs YAML file.
922

923
        :param fp: The text stream to parse.
924
        :return: A list of JobBase objects.
925
        :raise yaml.YAMLError: If a YAML error is found in the file.
926
        :raise ValueError: If a duplicate URL/command is found in the list.
927
        """
928

929
        def job_files_for_error() -> list[str]:
15✔
930
            """:return: A list of line containing the names of the job files."""
931
            if len(filenames) > 1:
15!
932
                jobs_files = ['in the concatenation of the jobs files:'] + [f'• {file},' for file in filenames]
×
933
            elif len(filenames) == 1:
15!
934
                jobs_files = [f'in jobs file {filenames[0]}.']
15✔
935
            else:
936
                jobs_files = []
×
937
            return jobs_files
15✔
938

939
        jobs = []
15✔
940
        jobs_by_guid = defaultdict(list)
15✔
941
        try:
15✔
942
            for i, job_data in enumerate((job for job in yaml.safe_load_all(fp) if job)):
15✔
943
                if not isinstance(job_data, dict):
15!
944
                    raise ValueError(
×
945
                        '\n   '.join(
946
                            (
947
                                f'Found invalid job data (consisting of the {type(job_data).__name__} {job_data})',
948
                                *job_files_for_error(),
949
                            )
950
                        )
951
                    )
952
                job_data['index_number'] = i + 1
15✔
953
                job = JobBase.unserialize(job_data, filenames)
15✔
954
                # TODO: Implement 100% validation and remove it from jobs.py
955
                # TODO: Try using pydantic to do this.
956
                if not isinstance(job.data, (NoneType, str, dict, list)):
15!
957
                    raise ValueError(
×
958
                        '\n   '.join(
959
                            (
960
                                f"The 'data' key needs to contain a string, a dictionary or a list; found a "
961
                                f'{type(job.data).__name__}',
962
                                f'in {job.get_indexed_location()}',
963
                                *job_files_for_error(),
964
                            )
965
                        )
966
                    )
967
                if not isinstance(job.filters, (NoneType, list)):
15!
968
                    if isinstance(job.filters, str):  # Backwards compatibility
×
969
                        warnings.warn(
×
970
                            '\n   '.join(
971
                                (
972
                                    f"The 'filters' key should contain a list; found a {type(job.filters).__name__}",
973
                                    f'in {job.get_indexed_location()}',
974
                                    *job_files_for_error(),
975
                                )
976
                            ),
977
                            RuntimeWarning,
978
                            stacklevel=1,
979
                        )
980
                        job.filters = [job.filters]
×
981
                    else:
982
                        raise ValueError(
×
983
                            '\n   '.join(
984
                                (
985
                                    f"The 'filters' key needs to contain a list; found a {type(job.filters).__name__} ",
986
                                    f'in {job.get_indexed_location()}',
987
                                    *job_files_for_error(),
988
                                )
989
                            )
990
                        )
991
                if not isinstance(job.headers, (NoneType, dict, Headers)):
15!
992
                    raise ValueError(
×
993
                        '\n   '.join(
994
                            (
995
                                f"The 'headers' key needs to contain a dictionary; found a "
996
                                f'{type(job.headers).__name__}',
997
                                f'in {job.get_indexed_location()})',
998
                                *job_files_for_error(),
999
                            )
1000
                        )
1001
                    )
1002
                if not isinstance(job.cookies, (NoneType, dict)):
15!
1003
                    raise ValueError(
×
1004
                        '\n   '.join(
1005
                            (
1006
                                f"The 'cookies' key needs to contain a dictionary; found a "
1007
                                f'{type(job.headers).__name__}',
1008
                                f'in {job.get_indexed_location()})',
1009
                                *job_files_for_error(),
1010
                            )
1011
                        )
1012
                    )
1013
                if not isinstance(job.switches, (NoneType, str, list)):
15!
1014
                    raise ValueError(
×
1015
                        '\n   '.join(
1016
                            (
1017
                                f"The 'switches' key needs to contain a string or a list; found a "
1018
                                f'{type(job.switches).__name__}',
1019
                                f'in {job.get_indexed_location()}',
1020
                                *job_files_for_error(),
1021
                            )
1022
                        )
1023
                    )
1024
                # We add GUID here to speed things up and to allow hooks to programmatically change job.url and/or
1025
                # job.user_visible_url
1026
                jobs.append(job)
15✔
1027
                jobs_by_guid[job.guid].append(job)
15✔
1028
        except yaml.scanner.ScannerError as e:
×
1029
            raise ValueError(
×
1030
                '\n   '.join(
1031
                    (
1032
                        f'YAML parser {e.args[2].replace("here", "")} in line {e.args[3].line + 1}, column'
1033
                        f' {e.args[3].column + 1}',
1034
                        *job_files_for_error(),
1035
                    )
1036
                )
1037
            ) from None
1038

1039
        conflicting_jobs = [guid_jobs[0].get_location() for _, guid_jobs in jobs_by_guid.items() if len(guid_jobs) != 1]
15✔
1040
        if conflicting_jobs:
15✔
1041
            raise ValueError(
15✔
1042
                '\n   '.join(
1043
                    ['Each job must have a unique URL/command (for URLs, append #1, #2, etc. to make them unique):']
1044
                    + [f'• {job}' for job in conflicting_jobs]
1045
                    + ['']
1046
                    + job_files_for_error()
1047
                )
1048
            ) from None
1049

1050
        return jobs
15✔
1051

1052
    @classmethod
15✔
1053
    def parse(cls, filename: Path) -> list[JobBase]:
15✔
1054
        """Parse the contents of a jobs YAML file and return a list of jobs.
1055

1056
        :param filename: The filename Path.
1057
        :return: A list of JobBase objects.
1058
        """
1059
        if filename is not None and filename.is_file():
15!
1060
            with filename.open() as fp:
15✔
1061
                return cls._parse(fp, [filename])
15✔
1062
        return []
×
1063

1064
    def load(self, *args: Any) -> list[JobBase]:
15✔
1065
        """Parse the contents of the jobs YAML file(s) and return a list of jobs.
1066

1067
        :return: A list of JobBase objects.
1068
        """
1069
        if len(self.filename) == 1:
15!
1070
            with self.filename[0].open() as f:
15✔
1071
                return self._parse(f, self.filename)
15✔
1072
        else:
1073
            fp = io.StringIO('\n---\n'.join(f.read_text(encoding='utf-8-sig') for f in self.filename if f.is_file()))
×
1074
            return self._parse(fp, self.filename)
×
1075

1076
    def save(self, jobs: Iterable[JobBase]) -> None:
15✔
1077
        """Save jobs to the job YAML file.
1078

1079
        :param jobs: An iterable of JobBase objects to be written.
1080
        """
1081
        print(f'Saving updated list to {self.filename[0]}.')
15✔
1082

1083
        with self.filename[0].open('w') as fp:
15✔
1084
            yaml.safe_dump_all([job.serialize() for job in jobs], fp, allow_unicode=True, sort_keys=False)
15✔
1085

1086

1087
class SsdbStorage(BaseFileStorage, ABC):
15✔
1088
    """Base class for snapshots storage."""
1089

1090
    @abstractmethod
15✔
1091
    def close(self) -> None:
15✔
1092
        pass
15✔
1093

1094
    @abstractmethod
15✔
1095
    def get_guids(self) -> list[str]:
15✔
1096
        pass
15✔
1097

1098
    @abstractmethod
15✔
1099
    def load(self, guid: str) -> Snapshot:
15✔
1100
        pass
15✔
1101

1102
    @abstractmethod
15✔
1103
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
15✔
1104
        pass
15✔
1105

1106
    @abstractmethod
15✔
1107
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
15✔
1108
        pass
×
1109

1110
    @abstractmethod
15✔
1111
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
15✔
1112
        pass
15✔
1113

1114
    @abstractmethod
15✔
1115
    def delete(self, guid: str) -> None:
15✔
1116
        pass
15✔
1117

1118
    @abstractmethod
15✔
1119
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
15✔
1120
        """For the given 'guid', delete only the latest 'delete_entries' entries and keep all other (older) ones.
1121

1122
        :param guid: The guid.
1123
        :param delete_entries: The number of most recent entries to delete.
1124

1125
        :returns: Number of records deleted.
1126
        """
1127

1128
    @abstractmethod
15✔
1129
    def delete_all(self) -> int:
15✔
1130
        """Delete all entries; used for testing only.
1131

1132
        :returns: Number of records deleted.
1133
        """
1134

1135
    @abstractmethod
15✔
1136
    def clean(self, guid: str, keep_entries: int = 1) -> int:
15✔
1137
        """Removes the entries for guid except the latest n keep_entries.
1138

1139
        :param guid: The guid.
1140
        :param keep_entries: The number of most recent entries to keep.
1141

1142
        :returns: Number of records deleted.
1143
        """
1144

1145
    @abstractmethod
15✔
1146
    def move(self, guid: str, new_guid: str) -> int:
15✔
1147
        """Replace uuid in records matching the 'guid' with the 'new_guid' value.
1148

1149
        If there are existing records with 'new_guid', they will not be overwritten and the job histories will be
1150
        merged.
1151

1152
        :returns: Number of records searched for replacement.
1153
        """
1154

1155
    @abstractmethod
15✔
1156
    def rollback(self, timestamp: float) -> int | None:
15✔
1157
        """Rolls back the database to timestamp.
1158

1159
        :param timestamp: The timestamp.
1160

1161
        :returns: Number of records deleted.
1162
        :raises: NotImplementedError for those classes where this method is not implemented.
1163
        """
1164

1165
    def backup(self) -> Iterator[tuple[str, str | bytes, float, int, str, str, ErrorData]]:
15✔
1166
        """Return the most recent entry for each 'guid'.
1167

1168
        :returns: A generator of tuples, each consisting of (guid, data, timestamp, tries, etag, mime_type)
1169
        """
1170
        for guid in self.get_guids():
15✔
1171
            data, timestamp, tries, etag, mime_type, error_data = self.load(guid)
15✔
1172
            yield guid, data, timestamp, tries, etag, mime_type, error_data
15✔
1173

1174
    def restore(self, entries: Iterable[tuple[str, str | bytes, float, int, str, str, ErrorData]]) -> None:
15✔
1175
        """Save multiple entries into the database.
1176

1177
        :param entries: An iterator of tuples WHERE each consists of (guid, data, timestamp, tries, etag, mime_type)
1178
        """
1179
        for guid, data, timestamp, tries, etag, mime_type, error_data in entries:
15✔
1180
            new_snapshot = Snapshot(
15✔
1181
                data=data, timestamp=timestamp, tries=tries, etag=etag, mime_type=mime_type, error_data=error_data
1182
            )
1183
            self.save(guid=guid, snapshot=new_snapshot, temporary=False)
15✔
1184

1185
    def gc(self, known_guids: Iterable[str], keep_entries: int = 1) -> None:
15✔
1186
        """Garbage collect the database: delete all guids not included in known_guids and keep only last n snapshot for
1187
        the others.
1188

1189
        :param known_guids: The guids to keep.
1190
        :param keep_entries: Number of entries to keep after deletion for the guids to keep.
1191
        """
1192
        for guid in set(self.get_guids()) - set(known_guids):
15✔
1193
            print(f'Deleting job {guid} (no longer being tracked).')
15✔
1194
            self.delete(guid)
15✔
1195
        self.clean_ssdb(known_guids, keep_entries)
15✔
1196

1197
    def clean_ssdb(self, known_guids: Iterable[str], keep_entries: int = 1) -> None:
15✔
1198
        """Convenience function to clean the cache.
1199

1200
        If self.clean_all is present, runs clean_all(). Otherwise, runs clean() on all known_guids, one at a time.
1201
        Prints the number of snapshots deleted.
1202

1203
        :param known_guids: An iterable of guids
1204
        :param keep_entries: Number of entries to keep after deletion.
1205
        """
1206
        if hasattr(self, 'clean_all'):
15✔
1207
            count = self.clean_all(keep_entries)  # ty:ignore[call-non-callable]
15✔
1208
            if count:
15✔
1209
                print(f'Deleted {count} old snapshots.')
15✔
1210
        else:
1211
            for guid in known_guids:
15✔
1212
                count = self.clean(guid, keep_entries)
15✔
1213
                if count:
15✔
1214
                    print(f'Deleted {count} old snapshots of {guid}.')
15✔
1215

1216
    @abstractmethod
15✔
1217
    def flushdb(self) -> None:
15✔
1218
        """Delete all entries of the database.  Use with care, there is no undo!"""
1219

1220

1221
class SsdbDirStorage(SsdbStorage):
15✔
1222
    """Class for snapshots stored as individual textual files in a directory 'dirname'."""
1223

1224
    def __init__(self, dirname: str | Path) -> None:
15✔
1225
        super().__init__(dirname)
15✔
1226
        self.filename.mkdir(parents=True, exist_ok=True)  # using the attr filename because it is a Path (confusing!)
15✔
1227
        logger.info(f'Using directory {self.filename} to store snapshot data as individual text files')
15✔
1228

1229
    def close(self) -> None:
15✔
1230
        # Nothing to close
1231
        return
15✔
1232

1233
    def _get_filename(self, guid: str) -> Path:
15✔
1234
        return self.filename.joinpath(guid)  # filename is a dir (confusing!)
15✔
1235

1236
    def get_guids(self) -> list[str]:
15✔
1237
        return [filename.name for filename in self.filename.iterdir()]
15✔
1238

1239
    def load(self, guid: str) -> Snapshot:
15✔
1240
        filename = self._get_filename(guid)
15✔
1241
        if not filename.is_file():
15✔
1242
            return Snapshot('', 0, 0, '', '', {})
15✔
1243

1244
        try:
15✔
1245
            data = filename.read_text()
15✔
1246
        except UnicodeDecodeError:
×
1247
            data = filename.read_text(errors='ignore')
×
1248
            logger.warning(f'Found and ignored Unicode-related errors when retrieving saved snapshot {guid}')
×
1249

1250
        timestamp = filename.stat().st_mtime
15✔
1251

1252
        return Snapshot(data, timestamp, 0, '', '', {})
15✔
1253

1254
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
15✔
1255
        if count is not None and count < 1:
15✔
1256
            return {}
15✔
1257
        snapshot = self.load(guid)
15✔
1258
        return {snapshot.data: snapshot.timestamp} if snapshot.data and snapshot.timestamp else {}
15✔
1259

1260
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
15✔
1261
        if count is not None and count < 1:
15✔
1262
            return []
15✔
1263
        snapshot = self.load(guid)
15✔
1264
        return [snapshot] if snapshot.data and snapshot.timestamp else []
15✔
1265

1266
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
15✔
1267
        # ETag and mime_type are ignored
1268
        filename = self._get_filename(guid)
15✔
1269
        with filename.open('w+') as fp:
15✔
1270
            fp.write(str(snapshot.data))
15✔
1271
        os.utime(filename, times=(datetime.now().timestamp(), snapshot.timestamp))  # noqa: DTZ005
15✔
1272

1273
    def delete(self, guid: str) -> None:
15✔
1274
        filename = self._get_filename(guid)
15✔
1275
        filename.unlink(missing_ok=True)
15✔
1276

1277
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
15✔
1278
        """For the given 'guid', delete the latest entry and keep all other (older) ones.
1279

1280
        :param guid: The guid.
1281
        :param delete_entries: The number of most recent entries to delete.
1282

1283
        :raises NotImplementedError: This function is not implemented for 'textfiles' databases.
1284
        """
1285
        raise NotImplementedError(
1286
            "Deleting of latest snapshot not supported by 'textfiles' database engine since only one snapshot is "
1287
            "saved. Delete all snapshots if that's what you are trying to do."
1288
        )
1289

1290
    def delete_all(self) -> int:
15✔
1291
        """Delete all entries; used for testing only.
1292

1293
        :raises NotImplementedError: This function is not implemented for 'textfiles' databases.
1294
        """
1295
        raise NotImplementedError(
1296
            "Deleting of latest snapshot not supported by 'textfiles' database engine since only one snapshot is "
1297
            "saved. Delete all snapshots if that's what you are trying to do."
1298
        )
1299

1300
    def clean(self, guid: str, keep_entries: int = 1) -> int:
15✔
1301
        if keep_entries != 1:
15✔
1302
            raise NotImplementedError('Only keeping latest 1 entry is supported.')
1303
        # We only store the latest version, no need to clean
1304
        return 0
15✔
1305

1306
    def move(self, guid: str, new_guid: str) -> int:
15✔
1307
        """Moves the data from guid to new_guid.
1308

1309
        :param guid: The guid.
1310
        :param new_guid: The new guid.
1311

1312
        :returns: Number of records moved.
1313
        """
1314
        if guid == new_guid:
15!
1315
            return 0
×
1316
        old_filepath = Path(self._get_filename(guid))
15✔
1317
        new_filepath = Path(self._get_filename(new_guid))
15✔
1318
        if old_filepath.exists():
15!
1319
            new_filepath.parent.mkdir(parents=True, exist_ok=True)
15✔
1320
            old_filepath.rename(new_filepath)
15✔
1321
        else:
1322
            raise ValueError(f'Old snapshot file {old_filepath} does not exist')
×
1323
        return 1
15✔
1324

1325
    def rollback(self, timestamp: float) -> None:
15✔
1326
        raise NotImplementedError("'textfiles' databases cannot be rolled back as new snapshots overwrite old ones")
1327

1328
    def flushdb(self) -> None:
15✔
1329
        for file in self.filename.iterdir():
15✔
1330
            if file.is_file():
15!
1331
                file.unlink()
15✔
1332

1333

1334
class SsdbSQLite3Storage(SsdbStorage):
15✔
1335
    """Handles storage of the snapshot as a SQLite database in the 'filename' file using Python's built-in sqlite3
1336
    module and the msgpack package.
1337

1338
    A temporary database is created by __init__ and will be written by the 'save()' function (unless temporary=False).
1339
    This data will be written to the permanent one by the 'close()' function, which is called at the end of program
1340
    execution.
1341

1342
    The database contains the 'webchanges' table with the following columns:
1343

1344
    * guid: unique hash of the "location", i.e. the URL/command; indexed
1345
    * timestamp: the Unix timestamp of when then the snapshot was taken; indexed
1346
    * msgpack_data: a msgpack blob containing 'data', 'tries', 'etag' and 'mime_type' in a dict of keys 'd', 't',
1347
      'e' and 'm'
1348
    """
1349

1350
    def __init__(self, filename: Path, max_snapshots: int = 4) -> None:
15✔
1351
        """:param filename: The full filename of the database file
1352
        :param max_snapshots: The maximum number of snapshots to retain in the database for each 'guid'
1353
        """
1354
        # Opens the database file and, if new, creates a table and index.
1355

1356
        self.max_snapshots = max_snapshots
15✔
1357

1358
        logger.debug(f'Run-time SQLite library: {sqlite3.sqlite_version}')
15✔
1359
        super().__init__(filename)
15✔
1360

1361
        self.filename.parent.mkdir(parents=True, exist_ok=True)
15✔
1362

1363
        # https://stackoverflow.com/questions/26629080
1364
        self.lock = threading.RLock()
15✔
1365

1366
        self.db = sqlite3.connect(filename, check_same_thread=False)
15✔
1367
        logger.info(f'Using sqlite3 {sqlite3.sqlite_version} database at {filename}')
15✔
1368
        self.cur = self.db.cursor()
15✔
1369
        self.cur.execute('PRAGMA temp_store = MEMORY;')
15✔
1370
        tables = self._execute("SELECT name FROM sqlite_master WHERE type='table';").fetchone()
15✔
1371

1372
        def _initialize_table() -> None:
15✔
1373
            logger.debug('Initializing sqlite3 database')
15✔
1374
            self._execute('CREATE TABLE webchanges (uuid TEXT, timestamp REAL, msgpack_data BLOB)')
15✔
1375
            self._execute('CREATE INDEX idx_uuid_time ON webchanges(uuid, timestamp)')
15✔
1376
            self.db.commit()
15✔
1377

1378
        if tables == ('CacheEntry',):
15✔
1379
            logger.info("Found legacy 'minidb' database to convert")
15✔
1380

1381
            # Found a minidb legacy database; close it, rename it for migration and create new sqlite3 one
1382
            import importlib.util
15✔
1383

1384
            if importlib.util.find_spec('minidb') is None:
15!
1385
                print('You have an old snapshot database format that needs to be converted to a current one.')
×
1386
                print(
×
1387
                    f"Please install the Python package 'minidb' for this one-time conversion and rerun "
1388
                    f'{__project_name__}.'
1389
                )
1390
                print('Use e.g. `pip install -U minidb`.')
×
1391
                print()
×
1392
                print("After the conversion, you can uninstall 'minidb' with e.g. `pip uninstall minidb`.")
×
1393
                sys.exit(1)
×
1394

1395
            print('Performing one-time conversion from old snapshot database format.')
15✔
1396
            self.db.close()
15✔
1397
            minidb_filename = filename.with_stem(filename.stem + '_minidb')
15✔
1398
            self.filename.replace(minidb_filename)
15✔
1399
            self.db = sqlite3.connect(filename, check_same_thread=False)
15✔
1400
            self.cur = self.db.cursor()
15✔
1401
            _initialize_table()
15✔
1402
            # Migrate the minidb legacy database renamed above
1403
            self.migrate_from_minidb(minidb_filename)
15✔
1404
        elif tables != ('webchanges',):
15!
1405
            _initialize_table()
15✔
1406

1407
        # Create temporary database in memory for writing during execution (fault tolerance)
1408
        logger.debug('Creating temp sqlite3 database file in memory')
15✔
1409
        self.temp_lock = threading.RLock()
15✔
1410
        self.temp_db = sqlite3.connect('', check_same_thread=False)
15✔
1411
        self.temp_cur = self.temp_db.cursor()
15✔
1412
        self._temp_execute('CREATE TABLE webchanges (uuid TEXT, timestamp REAL, msgpack_data BLOB)')
15✔
1413
        self.temp_db.commit()
15✔
1414

1415
    def _execute(self, sql: str, args: tuple | None = None) -> sqlite3.Cursor:
15✔
1416
        """Execute SQL command on main database"""
1417
        if args is None:
15✔
1418
            logger.debug(f"Executing (perm) '{sql}'")
15✔
1419
            return self.cur.execute(sql)
15✔
1420
        logger.debug(f"Executing (perm) '{sql}' with {args}")
15✔
1421
        return self.cur.execute(sql, args)
15✔
1422

1423
    def _temp_execute(self, sql: str, args: tuple | None = None) -> sqlite3.Cursor:
15✔
1424
        """Execute SQL command on temp database."""
1425
        if args is None:
15✔
1426
            logger.debug(f"Executing (temp) '{sql}'")
15✔
1427
            return self.temp_cur.execute(sql)
15✔
1428
        logger.debug(f"Executing (temp) '{sql}' with {args[:2]}...")
15✔
1429
        return self.temp_cur.execute(sql, args)
15✔
1430

1431
    def _copy_temp_to_permanent(self, delete: bool = False) -> None:
15✔
1432
        """Copy contents of temporary database to permanent one.
1433

1434
        :param delete: also delete contents of temporary cache (used for testing)
1435
        """
1436
        logger.debug('Saving new snapshots to permanent sqlite3 database')
15✔
1437
        # with self.temp_lock:
1438
        #     self.temp_db.commit()
1439
        # with self.lock:
1440
        #     self._execute('ATTACH DATABASE ? AS temp_db', (str(self.temp_filename),))
1441
        #     self._execute('INSERT INTO webchanges SELECT * FROM temp_db.webchanges')
1442
        #     logger.debug(f'Wrote {self.cur.rowcount} new snapshots to permanent sqlite3 database')
1443
        #     self.db.commit()
1444
        #     self._execute('DETACH DATABASE temp_db')
1445
        with self.temp_lock:
15✔
1446
            with self.lock:
15✔
1447
                for row in self._temp_execute('SELECT * FROM webchanges').fetchall():
15✔
1448
                    self._execute('INSERT INTO webchanges VALUES (?, ?, ?)', row)
15✔
1449
                self.db.commit()
15✔
1450
            if delete:
15✔
1451
                self._temp_execute('DELETE FROM webchanges')
15✔
1452

1453
    def close(self) -> None:
15✔
1454
        """Writes the temporary database to the permanent one, purges old entries if required, and closes all database
1455
        connections.
1456
        """
1457
        self._copy_temp_to_permanent()
15✔
1458
        with self.temp_lock:
15✔
1459
            self.temp_db.close()
15✔
1460
            logger.debug('Cleaning up the permanent sqlite3 database and closing the connection')
15✔
1461
        with self.lock:
15✔
1462
            if self.max_snapshots:
15✔
1463
                num_del = self.keep_latest(self.max_snapshots)
15✔
1464
                logger.debug(
15✔
1465
                    f'Keeping no more than {self.max_snapshots} snapshots per job: purged {num_del} older entries'
1466
                )
1467
            else:
1468
                self.db.commit()
15✔
1469
            self.db.close()
15✔
1470
            logger.info(f'Closed main sqlite3 database file {self.filename}')
15✔
1471
        del self.temp_cur
15✔
1472
        del self.temp_db
15✔
1473
        del self.temp_lock
15✔
1474
        del self.cur
15✔
1475
        del self.db
15✔
1476
        del self.lock
15✔
1477

1478
    def get_guids(self) -> list[str]:
15✔
1479
        """Lists the unique 'guid's contained in the database.
1480

1481
        :returns: A list of guids.
1482
        """
1483
        with self.lock:
15✔
1484
            self.cur.row_factory = lambda cursor, row: row[0]
15✔
1485
            guids = self._execute('SELECT DISTINCT uuid FROM webchanges').fetchall()
15✔
1486
            self.cur.row_factory = None
15✔
1487
        return guids
15✔
1488

1489
    def load(self, guid: str) -> Snapshot:
15✔
1490
        """Return the most recent entry matching a 'guid'.
1491

1492
        :param guid: The guid.
1493

1494
        :returns: A tuple (data, timestamp, tries, etag)
1495
            WHERE
1496

1497
            - data is the data;
1498
            - timestamp is the timestamp;
1499
            - tries is the number of tries;
1500
            - etag is the ETag.
1501
        """
1502
        with self.lock:
15✔
1503
            row = self._execute(
15✔
1504
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC LIMIT 1',
1505
                (guid,),
1506
            ).fetchone()
1507
        if row:
15✔
1508
            msgpack_data, timestamp = row
15✔
1509
            r = msgpack.unpackb(msgpack_data)
15✔
1510
            return Snapshot(r['d'], timestamp, r['t'], r['e'], r.get('m', ''), r.get('err', {}))
15✔
1511

1512
        return Snapshot('', 0, 0, '', '', {})
15✔
1513

1514
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
15✔
1515
        """Return max 'count' (None = all) records of data and timestamp of **successful** runs for a 'guid'.
1516

1517
        :param guid: The guid.
1518
        :param count: The maximum number of entries to return; if None return all.
1519

1520
        :returns: A dict (key: value)
1521
            WHERE
1522

1523
            - key is the snapshot data;
1524
            - value is the most recent timestamp for such snapshot.
1525
        """
1526
        if count is not None and count < 1:
15✔
1527
            return {}
15✔
1528

1529
        with self.lock:
15✔
1530
            rows = self._execute(
15✔
1531
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC', (guid,)
1532
            ).fetchall()
1533
        history = {}
15✔
1534
        if rows:
15✔
1535
            for msgpack_data, timestamp in rows:
15✔
1536
                r = msgpack.unpackb(msgpack_data)
15✔
1537
                if not r['t'] and r['d'] not in history:
15!
1538
                    history[r['d']] = timestamp
15✔
1539
                    if count is not None and len(history) >= count:
15!
1540
                        break
×
1541
        return history
15✔
1542

1543
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
15✔
1544
        """Return max 'count' (None = all) entries of all data (including from error runs) saved for a 'guid'.
1545

1546
        :param guid: The guid.
1547
        :param count: The maximum number of entries to return; if None return all.
1548

1549
        :returns: A list of Snapshot tuples (data, timestamp, tries, etag).
1550
            WHERE the values are:
1551

1552
            - data: The data (str, could be empty);
1553
            - timestamp: The timestamp (float);
1554
            - tries: The number of tries (int);
1555
            - etag: The ETag (str, could be empty).
1556
        """
1557
        if count is not None and count < 1:
15✔
1558
            return []
15✔
1559

1560
        with self.lock:
15✔
1561
            rows = self._execute(
15✔
1562
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC', (guid,)
1563
            ).fetchall()
1564
        history: list[Snapshot] = []
15✔
1565
        if rows:
15✔
1566
            for msgpack_data, timestamp in rows:
15✔
1567
                r = msgpack.unpackb(msgpack_data)
15✔
1568
                history.append(Snapshot(r['d'], timestamp, r['t'], r['e'], r.get('m', ''), r.get('err', {})))
15✔
1569
                if count is not None and len(history) >= count:
15✔
1570
                    break
6✔
1571
        return history
15✔
1572

1573
    def save(
15✔
1574
        self,
1575
        *args: Any,
1576
        guid: str,
1577
        snapshot: Snapshot,
1578
        temporary: bool | None = True,
1579
        **kwargs: Any,
1580
    ) -> None:
1581
        """Save the data from a job.
1582

1583
        By default, it is saved into the temporary database. Call close() to transfer the contents of the temporary
1584
        database to the permanent one.
1585

1586
        Note: the logic is such that any attempts that end in an exception will have tries >= 1, and we replace the data
1587
        with the one from the most recent successful attempt.
1588

1589
        :param guid: The guid.
1590
        :param data: The data.
1591
        :param timestamp: The timestamp.
1592
        :param tries: The number of tries.
1593
        :param etag: The ETag (could be empty string).
1594
        :param temporary: If true, saved to temporary database (default).
1595
        """
1596
        c = {
15✔
1597
            'd': snapshot.data,
1598
            't': snapshot.tries,
1599
            'e': snapshot.etag,
1600
            'm': snapshot.mime_type,
1601
            'err': snapshot.error_data,
1602
        }
1603
        msgpack_data = msgpack.packb(c)
15✔
1604
        if temporary:
15✔
1605
            with self.temp_lock:
15✔
1606
                self._temp_execute('INSERT INTO webchanges VALUES (?, ?, ?)', (guid, snapshot.timestamp, msgpack_data))
15✔
1607
                # we do not commit to temporary as it's being used as write-only (we commit at the end)
1608
        else:
1609
            with self.lock:
15✔
1610
                self._execute('INSERT INTO webchanges VALUES (?, ?, ?)', (guid, snapshot.timestamp, msgpack_data))
15✔
1611
                self.db.commit()
15✔
1612

1613
    def delete(self, guid: str) -> None:
15✔
1614
        """Delete all entries matching a 'guid'.
1615

1616
        :param guid: The guid.
1617
        """
1618
        with self.lock:
15✔
1619
            self._execute('DELETE FROM webchanges WHERE uuid = ?', (guid,))
15✔
1620
            self.db.commit()
15✔
1621

1622
    def delete_latest(
15✔
1623
        self,
1624
        guid: str,
1625
        delete_entries: int = 1,
1626
        temporary: bool | None = False,
1627
        **kwargs: Any,
1628
    ) -> int:
1629
        """For the given 'guid', delete the latest 'delete_entries' number of entries and keep all other (older) ones.
1630

1631
        :param guid: The guid.
1632
        :param delete_entries: The number of most recent entries to delete.
1633
        :param temporary: If False, deleted from permanent database (default).
1634

1635
        :returns: Number of records deleted.
1636
        """
1637
        if temporary:
15✔
1638
            with self.temp_lock:
15✔
1639
                self._temp_execute(
15✔
1640
                    'DELETE FROM webchanges '
1641
                    'WHERE ROWID IN ( '
1642
                    '    SELECT ROWID FROM webchanges '
1643
                    '    WHERE uuid = ? '
1644
                    '    ORDER BY timestamp DESC '
1645
                    '    LIMIT ? '
1646
                    ')',
1647
                    (guid, delete_entries),
1648
                )
1649
                num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1650
        else:
1651
            with self.lock:
15✔
1652
                self._execute(
15✔
1653
                    'DELETE FROM webchanges '
1654
                    'WHERE ROWID IN ( '
1655
                    '    SELECT ROWID FROM webchanges '
1656
                    '    WHERE uuid = ? '
1657
                    '    ORDER BY timestamp DESC '
1658
                    '    LIMIT ? '
1659
                    ')',
1660
                    (guid, delete_entries),
1661
                )
1662
                num_del = self._execute('SELECT changes()').fetchone()[0]
15✔
1663
                self.db.commit()
15✔
1664
        return num_del
15✔
1665

1666
    def delete_all(self) -> int:
15✔
1667
        """Delete all entries; used for testing only.
1668

1669
        :returns: Number of records deleted.
1670
        """
1671
        with self.lock:
15✔
1672
            self._execute('DELETE FROM webchanges')
15✔
1673
            self.db.commit()
15✔
1674
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1675
            self.db.commit()
15✔
1676

1677
        return num_del
15✔
1678

1679
    def clean(self, guid: str, keep_entries: int = 1) -> int:
15✔
1680
        """For the given 'guid', keep only the latest 'keep_entries' number of entries and delete all other (older)
1681
        ones. To delete older entries from all guids, use clean_all() instead.
1682

1683
        :param guid: The guid.
1684
        :param keep_entries: Number of entries to keep after deletion.
1685

1686
        :returns: Number of records deleted.
1687
        """
1688
        with self.lock:
15✔
1689
            self._execute(
15✔
1690
                'DELETE FROM webchanges '
1691
                'WHERE ROWID IN ( '
1692
                '    SELECT ROWID FROM webchanges '
1693
                '    WHERE uuid = ? '
1694
                '    ORDER BY timestamp DESC '
1695
                '    LIMIT -1 '
1696
                '    OFFSET ? '
1697
                ') ',
1698
                (guid, keep_entries),
1699
            )
1700
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1701
            self.db.commit()
15✔
1702
            self._execute('VACUUM')
15✔
1703
        return num_del
15✔
1704

1705
    def move(self, guid: str, new_guid: str) -> int:
15✔
1706
        """Replace uuid in records matching the 'guid' with the 'new_guid' value.
1707

1708
        If there are existing records with 'new_guid', they will not be overwritten and the job histories will be
1709
        merged.
1710

1711
        :returns: Number of records searched for replacement.
1712
        """
1713
        total_searched = 0
15✔
1714
        if guid != new_guid:
15!
1715
            with self.lock:
15✔
1716
                self._execute(
15✔
1717
                    'UPDATE webchanges SET uuid = REPLACE(uuid, ?, ?)',
1718
                    (guid, new_guid),
1719
                )
1720
                total_searched = self._execute('SELECT changes()').fetchone()[0]
15✔
1721
                self.db.commit()
15✔
1722
                self._execute('VACUUM')
15✔
1723

1724
        return total_searched
15✔
1725

1726
    def clean_all(self, keep_entries: int = 1) -> int:
15✔
1727
        """Delete all older entries for each 'guid' (keep only keep_entries).
1728

1729
        :returns: Number of records deleted.
1730
        """
1731
        with self.lock:
15✔
1732
            if keep_entries == 1:
15✔
1733
                self._execute(
15✔
1734
                    'DELETE FROM webchanges '
1735
                    'WHERE EXISTS ( '
1736
                    '    SELECT 1 FROM webchanges '
1737
                    '    w WHERE w.uuid = webchanges.uuid AND w.timestamp > webchanges.timestamp '
1738
                    ')'
1739
                )
1740
            else:
1741
                self._execute(
15✔
1742
                    'DELETE FROM webchanges '
1743
                    'WHERE ROWID IN ( '
1744
                    '    WITH rank_added AS ('
1745
                    '        SELECT '
1746
                    '             ROWID,'
1747
                    '             uuid,'
1748
                    '             timestamp, '
1749
                    '             ROW_NUMBER() OVER (PARTITION BY uuid ORDER BY timestamp DESC) AS rn'
1750
                    '        FROM webchanges '
1751
                    '    ) '
1752
                    '    SELECT ROWID FROM rank_added '
1753
                    '    WHERE rn > ?'
1754
                    ')',
1755
                    (keep_entries,),
1756
                )
1757
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1758
            self.db.commit()
15✔
1759
            self._execute('VACUUM')
15✔
1760
        return num_del
15✔
1761

1762
    def keep_latest(self, keep_entries: int = 1) -> int:
15✔
1763
        """Delete all older entries keeping only the 'keep_num' per guid.
1764

1765
        :param keep_entries: Number of entries to keep after deletion.
1766

1767
        :returns: Number of records deleted.
1768
        """
1769
        with self.lock:
15✔
1770
            self._execute(
15✔
1771
                'WITH '
1772
                'cte AS ( SELECT uuid, timestamp, ROW_NUMBER() OVER ( PARTITION BY uuid '
1773
                '                                                     ORDER BY timestamp DESC ) rn '
1774
                '         FROM webchanges ) '
1775
                'DELETE '
1776
                'FROM webchanges '
1777
                'WHERE EXISTS ( SELECT 1 '
1778
                '               FROM cte '
1779
                '               WHERE webchanges.uuid = cte.uuid '
1780
                '                 AND webchanges.timestamp = cte.timestamp '
1781
                '                 AND cte.rn > ? );',
1782
                (keep_entries,),
1783
            )
1784
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1785
            self.db.commit()
15✔
1786
        return num_del
15✔
1787

1788
    def rollback(self, timestamp: float, count: bool = False) -> int:
15✔
1789
        """Rollback database to the entries present at timestamp.
1790

1791
        :param timestamp: The timestamp.
1792
        :param count: If set to true, only count the number that would be deleted without doing so.
1793

1794
        :returns: Number of records deleted (or to be deleted).
1795
        """
1796
        command = 'SELECT COUNT(*)' if count else 'DELETE'
15✔
1797
        with self.lock:
15✔
1798
            self._execute(
15✔
1799
                f'{command} '  # noqa: S608 Possible SQL injection
1800
                'FROM webchanges '
1801
                'WHERE EXISTS ( '
1802
                '     SELECT 1 '
1803
                '     FROM webchanges AS w '
1804
                '     WHERE w.uuid = webchanges.uuid '
1805
                '     AND webchanges.timestamp > ? '
1806
                '     AND w.timestamp > ? '
1807
                ')',
1808
                (timestamp, timestamp),
1809
            )
1810
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
15✔
1811
            self.db.commit()
15✔
1812
        return num_del
15✔
1813

1814
    def migrate_from_minidb(self, minidb_filename: str | Path) -> None:
15✔
1815
        """Migrate the data of a legacy minidb database to the current database.
1816

1817
        :param minidb_filename: The filename of the legacy minidb database.
1818
        """
1819
        print("Found 'minidb' database and upgrading it to the new engine (note: only the last snapshot is retained).")
15✔
1820
        logger.info(
15✔
1821
            "Found legacy 'minidb' database and converting it to 'sqlite3' and new schema. Package 'minidb' needs to be"
1822
            ' installed for the conversion.'
1823
        )
1824

1825
        from webchanges.storage_minidb import SsdbMiniDBStorage
15✔
1826

1827
        legacy_db = SsdbMiniDBStorage(minidb_filename)
15✔
1828
        self.restore(legacy_db.backup())
15✔
1829
        legacy_db.close()
15✔
1830
        print(f'Database upgrade finished; the following backup file can be safely deleted: {minidb_filename}.\n')
15✔
1831
        print("The 'minidb' package can be removed (unless used by another program): $ pip uninstall minidb.")
15✔
1832
        print('-' * 80)
15✔
1833

1834
    def flushdb(self) -> None:
15✔
1835
        """Delete all entries of the database.  Use with care, there is no undo!"""
1836
        with self.lock:
15✔
1837
            self._execute('DELETE FROM webchanges')
15✔
1838
            self.db.commit()
15✔
1839

1840

1841
class SsdbRedisStorage(SsdbStorage):
15✔
1842
    """Class for storing snapshots using redis."""
1843

1844
    def __init__(self, filename: str | Path) -> None:
15✔
1845
        super().__init__(filename)
×
1846

1847
        if isinstance(redis, str):
×
1848
            raise ImportError(f"Python package 'redis' cannot be imported.\n{redis}")
×
1849

1850
        self.db = redis.from_url(str(filename))
×
1851
        logger.info(f'Using {self.filename} for database')
×
1852

1853
    @staticmethod
15✔
1854
    def _make_key(guid: str) -> str:
15✔
1855
        return 'guid:' + guid
×
1856

1857
    def close(self) -> None:
15✔
1858
        self.db.connection_pool.disconnect()
×
1859
        del self.db
×
1860

1861
    def get_guids(self) -> list[str]:
15✔
1862
        return [guid[5:].decode() for guid in self.db.keys('guid:*')]
×
1863

1864
    def load(self, guid: str) -> Snapshot:
15✔
1865
        key = self._make_key(guid)
×
1866
        data = self.db.lindex(key, 0)
×
1867

1868
        if data:
×
1869
            r = msgpack.unpackb(data)
×
1870
            return Snapshot(
×
1871
                r['data'], r['timestamp'], r['tries'], r['etag'], r.get('mime_type', ''), r.get('err_data', {})
1872
            )
1873

1874
        return Snapshot('', 0, 0, '', '', {})
×
1875

1876
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
15✔
1877
        if count is not None and count < 1:
×
1878
            return {}
×
1879

1880
        history = {}
×
1881
        key = self._make_key(guid)
×
1882
        for i in range(self.db.llen(key)):
×
1883
            r = self.db.lindex(key, i)
×
1884
            c = msgpack.unpackb(r)
×
1885
            if (c['tries'] == 0 or c['tries'] is None) and c['data'] not in history:
×
1886
                history[c['data']] = c['timestamp']
×
1887
                if count is not None and len(history) >= count:
×
1888
                    break
×
1889
        return history
×
1890

1891
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
15✔
1892
        if count is not None and count < 1:
×
1893
            return []
×
1894

1895
        history: list[Snapshot] = []
×
1896
        key = self._make_key(guid)
×
1897
        for i in range(self.db.llen(key)):
×
1898
            r = self.db.lindex(key, i)
×
1899
            c = msgpack.unpackb(r)
×
1900
            if c['tries'] == 0 or c['tries'] is None:
×
1901
                history.append(
×
1902
                    Snapshot(
1903
                        c['data'],
1904
                        c['timestamp'],
1905
                        c['tries'],
1906
                        c['etag'],
1907
                        c.get('mime_type', ''),
1908
                        c.get('error_data', {}),
1909
                    )
1910
                )
1911
                if count is not None and len(history) >= count:
×
1912
                    break
×
1913
        return history
×
1914

1915
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
15✔
1916
        r = {
×
1917
            'data': snapshot.data,
1918
            'timestamp': snapshot.timestamp,
1919
            'tries': snapshot.tries,
1920
            'etag': snapshot.etag,
1921
            'mime_type': snapshot.mime_type,
1922
            'error_data': snapshot.error_data,
1923
        }
1924
        packed_data = msgpack.packb(r)
×
1925
        if packed_data:
×
1926
            self.db.lpush(self._make_key(guid), packed_data)
×
1927

1928
    def delete(self, guid: str) -> None:
15✔
1929
        self.db.delete(self._make_key(guid))
×
1930

1931
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
15✔
1932
        """For the given 'guid', delete the latest 'delete_entries' entry and keep all other (older) ones.
1933

1934
        :param guid: The guid.
1935
        :param delete_entries: The number of most recent entries to delete (only 1 is supported by this Redis code).
1936

1937
        :returns: Number of records deleted.
1938
        """
1939
        if delete_entries != 1:
×
1940
            raise NotImplementedError('Only deleting of the latest 1 entry is supported by this Redis code.')
1941

1942
        if self.db.lpop(self._make_key(guid)) is None:
×
1943
            return 0
×
1944

1945
        return 1
×
1946

1947
    def delete_all(self) -> int:
15✔
1948
        """Delete all entries; used for testing only.
1949

1950
        :returns: Number of records deleted.
1951
        """
1952
        raise NotImplementedError('This method is not implemented for Redis.')
1953

1954
    def clean(self, guid: str, keep_entries: int = 1) -> int:
15✔
1955
        if keep_entries != 1:
×
1956
            raise NotImplementedError('Only keeping latest 1 entry is supported.')
1957

1958
        key = self._make_key(guid)
×
1959
        i = self.db.llen(key)
×
1960
        if self.db.ltrim(key, 0, 0):
×
1961
            return i - self.db.llen(key)
×
1962

1963
        return 0
×
1964

1965
    def move(self, guid: str, new_guid: str) -> int:
15✔
1966
        if guid == new_guid:
×
1967
            return 0
×
1968
        key = self._make_key(guid)
×
1969
        new_key = self._make_key(new_guid)
×
1970
        # Note if a list with 'new_key' already exists, the data stored there
1971
        # will be overwritten.
1972
        self.db.rename(key, new_key)
×
1973
        return self.db.llen(new_key)
×
1974

1975
    def rollback(self, timestamp: float) -> None:
15✔
1976
        """Rolls back the database to timestamp.
1977

1978
        :raises: NotImplementedError: This function is not implemented for 'redis' database engine.
1979
        """
1980
        raise NotImplementedError("Rolling back the database is not supported by 'redis' database engine")
1981

1982
    def flushdb(self) -> None:
15✔
1983
        """Delete all entries of the database.  Use with care, there is no undo!"""
1984
        self.db.flushdb()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc