• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 17710149774

14 Sep 2025 10:49AM UTC coverage: 71.376% (-3.1%) from 74.434%
17710149774

push

github

mborsetti
Version 3.31.1.post2

1383 of 2314 branches covered (59.77%)

Branch coverage included in aggregate %.

4614 of 6088 relevant lines covered (75.79%)

5.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.11
/webchanges/storage.py
1
"""Handles all storage: jobs files, config files, hooks file, and cache database engines."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import copy
8✔
8
import inspect
8✔
9
import io
8✔
10
import logging
8✔
11
import os
8✔
12
import shutil
8✔
13
import sqlite3
8✔
14
import sys
8✔
15
import threading
8✔
16
import warnings
8✔
17
from abc import ABC, abstractmethod
8✔
18
from collections import defaultdict
8✔
19
from dataclasses import dataclass
8✔
20
from datetime import datetime, timezone
8✔
21
from pathlib import Path
8✔
22
from typing import Any, Iterable, Iterator, Literal, TextIO, TypedDict
8✔
23

24
import msgpack
8✔
25
import yaml
8✔
26
import yaml.scanner
8✔
27

28
from webchanges import __docs_url__, __project_name__, __version__
8✔
29
from webchanges.filters import FilterBase
8✔
30
from webchanges.handler import ErrorData, Snapshot
8✔
31
from webchanges.jobs import JobBase, ShellJob
8✔
32
from webchanges.reporters import ReporterBase
8✔
33
from webchanges.util import edit_file, file_ownership_checks
8✔
34

35
try:
8✔
36
    from httpx import Headers
8✔
37
except ImportError:  # pragma: no cover
38
    from webchanges._vendored.headers import Headers  # type: ignore[assignment]
39

40
try:
8✔
41
    from types import NoneType
8✔
42
except ImportError:  # pragma: no cover # Python 3.9
43
    NoneType = type(None)  # type: ignore[misc]
44

45
try:
8✔
46
    import redis
8✔
47
except ImportError as e:  # pragma: no cover
48
    redis = str(e)  # type: ignore[assignment]
49

50
logger = logging.getLogger(__name__)
8✔
51

52
_ConfigDisplay = TypedDict(
8✔
53
    '_ConfigDisplay',
54
    {
55
        'new': bool,
56
        'error': bool,
57
        'unchanged': bool,
58
        'empty-diff': bool,
59
    },
60
)
61
_ConfigReportText = TypedDict(
8✔
62
    '_ConfigReportText',
63
    {
64
        'line_length': int,
65
        'details': bool,
66
        'footer': bool,
67
        'minimal': bool,
68
        'separate': bool,
69
    },
70
)
71
_ConfigReportHtml = TypedDict(
8✔
72
    '_ConfigReportHtml',
73
    {
74
        'diff': Literal['unified', 'table'],
75
        'footer': bool,
76
        'separate': bool,
77
        'title': str,
78
    },
79
)
80
_ConfigReportMarkdown = TypedDict(
8✔
81
    '_ConfigReportMarkdown',
82
    {
83
        'details': bool,
84
        'footer': bool,
85
        'minimal': bool,
86
        'separate': bool,
87
    },
88
)
89
_ConfigReportStdout = TypedDict(
8✔
90
    '_ConfigReportStdout',
91
    {
92
        'enabled': bool,
93
        'color': bool,
94
    },
95
)
96
_ConfigReportBrowser = TypedDict(
8✔
97
    '_ConfigReportBrowser',
98
    {
99
        'enabled': bool,
100
    },
101
)
102
_ConfigReportDiscord = TypedDict(
8✔
103
    '_ConfigReportDiscord',
104
    {
105
        'enabled': bool,
106
        'webhook_url': str,
107
        'embed': bool,
108
        'subject': str,
109
        'colored': bool,
110
        'max_message_length': int | None,
111
    },
112
)
113
_ConfigReportEmailSmtp = TypedDict(
8✔
114
    '_ConfigReportEmailSmtp',
115
    {
116
        'host': str,
117
        'user': str,
118
        'port': int,
119
        'starttls': bool,
120
        'auth': bool,
121
        'insecure_password': str,
122
        'utf-8': bool,
123
    },
124
)
125
_ConfigReportEmailSendmail = TypedDict(
8✔
126
    '_ConfigReportEmailSendmail',
127
    {
128
        'path': str | Path,
129
    },
130
)
131
_ConfigReportEmail = TypedDict(
8✔
132
    '_ConfigReportEmail',
133
    {
134
        'enabled': bool,
135
        'html': bool,
136
        'to': str,
137
        'from': str,
138
        'subject': str,
139
        'method': Literal['sendmail', 'smtp'],
140
        'smtp': _ConfigReportEmailSmtp,
141
        'sendmail': _ConfigReportEmailSendmail,
142
    },
143
)
144
_ConfigReportGithubIssue = TypedDict(
8✔
145
    '_ConfigReportGithubIssue',
146
    {
147
        'enabled': bool,
148
        'token': str,
149
        'owner': str,
150
        'repo': str,
151
        'title': str,
152
        'labels': list[str],
153
        'format_dt': str,
154
        'format_content': str,
155
        'assignees': list[str],
156
        'type': str,
157
        'milestone': str,
158
    },
159
)
160
_ConfigReportGotify = TypedDict(
8✔
161
    '_ConfigReportGotify',
162
    {
163
        'enabled': bool,
164
        'priority': int,
165
        'server_url': str,
166
        'title': str,
167
        'token': str,
168
    },
169
)
170
_ConfigReportIfttt = TypedDict(
8✔
171
    '_ConfigReportIfttt',
172
    {
173
        'enabled': bool,
174
        'key': str,
175
        'event': str,
176
    },
177
)
178
_ConfigReportMailgun = TypedDict(
8✔
179
    '_ConfigReportMailgun',
180
    {
181
        'enabled': bool,
182
        'region': str,
183
        'api_key': str,
184
        'domain': str,
185
        'from_mail': str,
186
        'from_name': str,
187
        'to': str,
188
        'subject': str,
189
    },
190
)
191
_ConfigReportMatrix = TypedDict(
8✔
192
    '_ConfigReportMatrix',
193
    {
194
        'enabled': bool,
195
        'homeserver': str,
196
        'access_token': str,
197
        'room_id': str,
198
    },
199
)
200
_ConfigReportProwl = TypedDict(
8✔
201
    '_ConfigReportProwl',
202
    {
203
        'enabled': bool,
204
        'api_key': str,
205
        'priority': int,
206
        'application': str,
207
        'subject': str,
208
    },
209
)
210
_ConfigReportPushbullet = TypedDict(
8✔
211
    '_ConfigReportPushbullet',
212
    {
213
        'enabled': bool,
214
        'api_key': str,
215
    },
216
)
217
_ConfigReportPushover = TypedDict(
8✔
218
    '_ConfigReportPushover',
219
    {
220
        'enabled': bool,
221
        'app': str,
222
        'device': str | None,
223
        'sound': str,
224
        'user': str,
225
        'priority': str,
226
    },
227
)
228
_ConfigReportRunCommand = TypedDict(
8✔
229
    '_ConfigReportRunCommand',
230
    {
231
        'enabled': bool,
232
        'command': str,
233
    },
234
)
235
_ConfigReportTelegram = TypedDict(
8✔
236
    '_ConfigReportTelegram',
237
    {
238
        'enabled': bool,
239
        'bot_token': str,
240
        'chat_id': str | int | list[str | int],
241
        'silent': bool,
242
    },
243
)
244
_ConfigReportWebhook = TypedDict(
8✔
245
    '_ConfigReportWebhook',
246
    {
247
        'enabled': bool,
248
        'markdown': bool,
249
        'webhook_url': str,
250
        'rich_text': bool | None,
251
        'max_message_length': int | None,
252
    },
253
)
254
_ConfigReportXmpp = TypedDict(
8✔
255
    '_ConfigReportXmpp',
256
    {
257
        'enabled': bool,
258
        'sender': str,
259
        'recipient': str,
260
        'insecure_password': str | None,
261
    },
262
)
263

264
_ConfigReport = TypedDict(
8✔
265
    '_ConfigReport',
266
    {
267
        'tz': str | None,
268
        'text': _ConfigReportText,
269
        'html': _ConfigReportHtml,
270
        'markdown': _ConfigReportMarkdown,
271
        'stdout': _ConfigReportStdout,
272
        'browser': _ConfigReportBrowser,
273
        'discord': _ConfigReportDiscord,
274
        'email': _ConfigReportEmail,
275
        'github_issue': _ConfigReportGithubIssue,
276
        'gotify': _ConfigReportGotify,
277
        'ifttt': _ConfigReportIfttt,
278
        'mailgun': _ConfigReportMailgun,
279
        'matrix': _ConfigReportMatrix,
280
        'prowl': _ConfigReportProwl,
281
        'pushbullet': _ConfigReportPushbullet,
282
        'pushover': _ConfigReportPushover,
283
        'run_command': _ConfigReportRunCommand,
284
        'telegram': _ConfigReportTelegram,
285
        'webhook': _ConfigReportWebhook,
286
        'xmpp': _ConfigReportXmpp,
287
    },
288
)
289
_ConfigJobDefaults = TypedDict(
8✔
290
    '_ConfigJobDefaults',
291
    {
292
        '_note': str,
293
        'all': dict[str, Any],
294
        'url': dict[str, Any],
295
        'browser': dict[str, Any],
296
        'command': dict[str, Any],
297
    },
298
    total=False,
299
)
300
_ConfigDifferDefaults = TypedDict(
8✔
301
    '_ConfigDifferDefaults',
302
    {
303
        '_note': str,
304
        'unified': dict[str, Any],
305
        'ai_google': dict[str, Any],
306
        'command': dict[str, Any],
307
        'deepdiff': dict[str, Any],
308
        'image': dict[str, Any],
309
        'table': dict[str, Any],
310
        'wdiff': dict[str, Any],
311
    },
312
    total=False,
313
)
314
_ConfigDatabase = TypedDict(
8✔
315
    '_ConfigDatabase',
316
    {
317
        'engine': Literal['sqlite3', 'redis', 'minidb', 'textfiles'] | str,
318
        'max_snapshots': int,
319
    },
320
)
321
_Config = TypedDict(
8✔
322
    '_Config',
323
    {
324
        'display': _ConfigDisplay,
325
        'report': _ConfigReport,
326
        'job_defaults': _ConfigJobDefaults,
327
        'differ_defaults': _ConfigDifferDefaults,
328
        'database': _ConfigDatabase,
329
        'footnote': str | None,
330
    },
331
)
332

333
DEFAULT_CONFIG: _Config = {
8✔
334
    'display': {  # select whether the report include the categories below
335
        'new': True,
336
        'error': True,
337
        'unchanged': False,
338
        'empty-diff': False,
339
    },
340
    'report': {
341
        'tz': None,  # the timezone as a IANA time zone name, e.g. 'America/Los_Angeles', or null for machine's
342
        # the directives below are for the report content types (text, html or markdown)
343
        'text': {
344
            'details': True,  # whether the diff is sent
345
            'footer': True,
346
            'line_length': 75,
347
            'minimal': False,
348
            'separate': False,
349
        },
350
        'html': {
351
            'diff': 'unified',  # 'unified' or 'table'
352
            'footer': True,
353
            'separate': False,
354
            'title': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
355
        },
356
        'markdown': {
357
            'details': True,  # whether the diff is sent
358
            'footer': True,
359
            'minimal': False,
360
            'separate': False,
361
        },
362
        # the directives below control 'reporters', i.e. where a report is displayed and/or sent
363
        'stdout': {  # the console / command line display; uses text
364
            'enabled': True,
365
            'color': True,
366
        },
367
        'browser': {  # the system's default browser; uses html
368
            'enabled': False,
369
        },
370
        'discord': {
371
            'enabled': False,
372
            'webhook_url': '',
373
            'embed': True,
374
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
375
            'colored': True,
376
            'max_message_length': None,
377
        },
378
        'email': {  # email (except mailgun); uses text or both html and text if 'html' is set to true
379
            'enabled': False,
380
            'html': True,
381
            'from': '',
382
            'to': '',
383
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
384
            'method': 'smtp',  # either 'smtp' or 'sendmail'
385
            'smtp': {
386
                'host': 'localhost',
387
                'port': 587,
388
                'starttls': True,
389
                'auth': True,
390
                'user': '',
391
                'insecure_password': '',
392
                'utf-8': True,
393
            },
394
            'sendmail': {
395
                'path': 'sendmail',
396
            },
397
        },
398
        'github_issue': {
399
            'enabled': False,
400
            'token': '',
401
            'owner': '',
402
            'repo': '',
403
            'title': '',
404
            'labels': [],
405
            'format_dt': '',
406
            'format_content': '',
407
            'assignees': [],
408
            'type': '',
409
            'milestone': '',
410
        },
411
        'gotify': {  # uses markdown
412
            'enabled': False,
413
            'priority': 0,
414
            'server_url': '',
415
            'title': '',
416
            'token': '',
417
        },
418
        'ifttt': {  # uses text
419
            'enabled': False,
420
            'key': '',
421
            'event': '',
422
        },
423
        'mailgun': {  # uses text
424
            'enabled': False,
425
            'region': 'us',
426
            'api_key': '',
427
            'domain': '',
428
            'from_mail': '',
429
            'from_name': '',
430
            'to': '',
431
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
432
        },
433
        'matrix': {  # uses text
434
            'enabled': False,
435
            'homeserver': '',
436
            'access_token': '',
437
            'room_id': '',
438
        },
439
        'prowl': {  # uses text
440
            'enabled': False,
441
            'api_key': '',
442
            'priority': 0,
443
            'application': '',
444
            'subject': f'[{__project_name__}] {{count}} changes{{jobs_files}}: {{jobs}}',
445
        },
446
        'pushbullet': {  # uses text
447
            'enabled': False,
448
            'api_key': '',
449
        },
450
        'pushover': {  # uses text
451
            'enabled': False,
452
            'app': '',
453
            'user': '',
454
            'device': None,
455
            'sound': 'spacealarm',
456
            'priority': 'normal',
457
        },
458
        'run_command': {  # uses text
459
            'enabled': False,
460
            'command': '',
461
        },
462
        'telegram': {  # uses markdown (from version 3.7)
463
            'enabled': False,
464
            'bot_token': '',
465
            'chat_id': '',
466
            'silent': False,
467
        },
468
        'webhook': {
469
            'enabled': False,
470
            'webhook_url': '',
471
            'markdown': False,
472
            'rich_text': None,
473
            'max_message_length': None,
474
        },
475
        'xmpp': {  # uses text
476
            'enabled': False,
477
            'sender': '',
478
            'recipient': '',
479
            'insecure_password': '',
480
        },
481
    },
482
    'job_defaults': {
483
        '_note': 'Default directives that are applied to jobs.',
484
        'all': {'_note': 'These are used for all type of jobs, including those in hooks.py.'},
485
        'url': {'_note': "These are used for 'url' jobs without 'use_browser'."},
486
        'browser': {'_note': "These are used for 'url' jobs with 'use_browser: true'."},
487
        'command': {'_note': "These are used for 'command' jobs."},
488
    },
489
    'differ_defaults': {
490
        '_note': 'Default directives that are applied to individual differs.',
491
        'unified': {},
492
        'ai_google': {},
493
        'command': {},
494
        'deepdiff': {},
495
        'image': {},
496
        'table': {},
497
        'wdiff': {},
498
    },
499
    'database': {
500
        'engine': 'sqlite3',
501
        'max_snapshots': 4,
502
    },
503
    'footnote': None,
504
}
505

506

507
# Custom YAML constructor for !include
508
def yaml_include(loader: yaml.SafeLoader, node: yaml.Node) -> list[Any]:
8✔
509
    file_path = Path(loader.name).parent.joinpath(node.value)
×
510
    with file_path.open('r') as f:
×
511
        return list(yaml.safe_load_all(f))
×
512

513

514
# Add the custom constructor to the YAML loader
515
yaml.add_constructor('!include', yaml_include, Loader=yaml.SafeLoader)
8✔
516

517

518
@dataclass
8✔
519
class BaseStorage(ABC):  # noqa:  B024 abstract base class, but it has no abstract methods or properties
8✔
520
    """Base class for storage."""
521

522

523
class BaseFileStorage(BaseStorage, ABC):
8✔
524
    """Base class for file storage."""
525

526
    def __init__(self, filename: str | Path) -> None:
8✔
527
        """
528

529
        :param filename: The filename or directory name to storage.
530
        """
531
        if isinstance(filename, str):
8✔
532
            self.filename = Path(filename)
8✔
533
        else:
534
            self.filename = filename
8✔
535

536

537
class BaseTextualFileStorage(BaseFileStorage, ABC):
8✔
538
    """Base class for textual files."""
539

540
    def __init__(self, filename: str | Path) -> None:
8✔
541
        """
542

543
        :param filename: The filename or directory name to storage.
544
        """
545
        super().__init__(filename)
8✔
546
        # if not isinstance(self, JobsBaseFileStorage):
547
        #     self.load()
548

549
    @abstractmethod
8✔
550
    def load(self, *args: Any) -> Any:
8✔
551
        """Load from storage.
552

553
        :param args: Specified by the subclass.
554
        :return: Specified by the subclass.
555
        """
556
        pass
8✔
557

558
    @abstractmethod
8✔
559
    def save(self, *args: Any, **kwargs: Any) -> Any:
8✔
560
        """Save to storage.
561

562
        :param args: Specified by the subclass.
563
        :param kwargs: Specified by the subclass.
564
        :return: Specified by the subclass.
565
        """
566
        pass
8✔
567

568
    @classmethod
8✔
569
    @abstractmethod
8✔
570
    def parse(cls, filename: Path) -> Any:
8✔
571
        """Parse storage contents.
572

573
        :param filename: The filename.
574
        :return: Specified by the subclass.
575
        """
576
        pass
8✔
577

578
    def edit(self) -> int:
8✔
579
        """Edit file.
580

581
        :returns: None if edit is successful, 1 otherwise.
582
        """
583
        # Similar code to UrlwatchCommand.edit_hooks()
584
        logger.debug(f'Edit file {self.filename}')
8✔
585
        if isinstance(self.filename, list):
8✔
586
            if len(self.filename) > 1:
8!
587
                raise ValueError(f'Only one jobs file can be specified for editing; found {len(self.filename)}.')
×
588
            filename = self.filename[0]
8✔
589
        else:
590
            filename = self.filename
8✔
591
        file_edit = filename.with_stem(filename.stem + '_edit')
8✔
592

593
        if filename.is_file():
8!
594
            shutil.copy(filename, file_edit)
8✔
595
        # elif example_file is not None and Path(example_file).is_file():
596
        #     shutil.copy(example_file, file_edit, follow_symlinks=False)
597

598
        while True:
8✔
599
            try:
8✔
600
                edit_file(file_edit)
8✔
601
                # Check if we can still parse it
602
                if self.parse is not None:
8!
603
                    self.parse(file_edit)
8✔
604
                break  # stop if no exception on parser
8✔
605
            except SystemExit:
8✔
606
                raise
×
607
            except Exception as e:
8✔
608
                print()
8✔
609
                print('Errors in updating file:')
8✔
610
                print('======')
8✔
611
                print(e)
8✔
612
                print('======')
8✔
613
                print('')
8✔
614
                print(f'The file {filename} was NOT updated.')
8✔
615
                user_input = input('Do you want to retry the same edit? [Y/n] ')
8✔
616
                if not user_input or user_input.lower().startswith('y'):
×
617
                    continue
×
618
                file_edit.unlink()
×
619
                print('No changes have been saved.')
×
620
                return 1
×
621

622
        if filename.is_symlink():
8!
623
            filename.write_text(file_edit.read_text())
×
624
        else:
625
            file_edit.replace(filename)
8✔
626
        file_edit.unlink(missing_ok=True)
8✔
627
        print('Saved edits in', filename)
8✔
628
        return 0
8✔
629

630

631
class JobsBaseFileStorage(BaseTextualFileStorage, ABC):
8✔
632
    """Class for jobs textual files storage."""
633

634
    filename: list[Path]  # type: ignore[assignment]
8✔
635

636
    def __init__(self, filename: list[Path]) -> None:
8✔
637
        """
638

639
        :param filename: The filenames of the jobs file.
640
        """
641
        super().__init__(filename)  # type: ignore[arg-type]
8✔
642
        self.filename = filename
8✔
643

644
    def load_secure(self) -> list[JobBase]:
8✔
645
        """Load the jobs from a text file checking that the file is secure (i.e. belongs to the current UID and only
646
        the owner can write to it - Linux only).
647

648
        :return: List of JobBase objects.
649
        """
650
        jobs: list[JobBase] = self.load()
8✔
651

652
        def is_shell_job(job: JobBase) -> bool:
8✔
653
            """Check if the job uses filter 'shellpipe' or an external differ, as they call
654
            subprocess.run(shell=True) (insecure).
655

656
            :returns: True if subprocess.run(shell=True) is invoked by job, False otherwise.
657
            """
658
            if isinstance(job, ShellJob):
8✔
659
                return True
8✔
660

661
            for filter_kind, _ in FilterBase.normalize_filter_list(job.filters, job.index_number):
8!
662
                if filter_kind == 'shellpipe':
×
663
                    return True
×
664

665
                if job.differ and job.differ.get('name') == 'command':
×
666
                    return True
×
667

668
            return False
8✔
669

670
        shelljob_errors = []
8✔
671
        for file in self.filename:
8✔
672
            shelljob_errors.extend(file_ownership_checks(file))
8✔
673
        removed_jobs = (job for job in jobs if is_shell_job(job))
8✔
674
        if shelljob_errors and any(removed_jobs):
8✔
675
            print(
8✔
676
                f'ERROR: Removing the following jobs because '
677
                f' {" and ".join(shelljob_errors)}: {" ,".join(str(job.index_number) for job in removed_jobs)}\n'
678
                f'(see {__docs_url__}en/stable/jobs.html#important-note-for-command-jobs).'
679
            )
680
            jobs = [job for job in jobs if job not in removed_jobs]
8✔
681

682
        logger.info(f'Loaded {len(jobs)} jobs from {", ".join(str(file) for file in self.filename)}.')
8✔
683
        return jobs
8✔
684

685

686
class BaseYamlFileStorage(BaseTextualFileStorage, ABC):
8✔
687
    """Base class for YAML textual files storage."""
688

689
    @classmethod
8✔
690
    def parse(cls, filename: Path) -> Any:
8✔
691
        """Return contents of YAML file if it exists
692

693
        :param filename: The filename Path.
694
        :return: Specified by the subclass.
695
        """
696
        if filename is not None and filename.is_file():
8✔
697
            with filename.open() as fp:
8✔
698
                return yaml.safe_load(fp)
8✔
699

700

701
class YamlConfigStorage(BaseYamlFileStorage):
8✔
702
    """Class for configuration file (is a YAML textual file)."""
703

704
    config: _Config = {}  # type: ignore[typeddict-item]
8✔
705

706
    @staticmethod
8✔
707
    def dict_deep_difference(d1: _Config, d2: _Config, ignore_underline_keys: bool = False) -> _Config:
8✔
708
        """Recursively find elements in the first dict that are not in the second.
709

710
        :param d1: The first dict.
711
        :param d2: The second dict.
712
        :param ignore_underline_keys: If true, keys starting with _ are ignored (treated as remarks)
713
        :return: A dict with all the elements on the first dict that are not in the second.
714
        """
715

716
        def _sub_dict_deep_difference(d1_: _Config, d2_: _Config) -> _Config:
8✔
717
            """Recursive sub-function to find elements in the first dict that are not in the second.
718

719
            :param d1_: The first dict.
720
            :param d2_: The second dict.
721
            :return: A dict with elements on the first dict that are not in the second.
722
            """
723
            for key, value in d1_.copy().items():
8✔
724
                if ignore_underline_keys and key.startswith('_'):
8✔
725
                    d1_.pop(key, None)  # type: ignore[misc]
8✔
726
                elif isinstance(value, dict) and isinstance(d2_.get(key), dict):
8✔
727
                    _sub_dict_deep_difference(value, d2_[key])  # type: ignore[arg-type,literal-required]
8✔
728
                    if not len(value):
8✔
729
                        d1_.pop(key)  # type: ignore[misc]
8✔
730
                else:
731
                    if key in d2_:
8✔
732
                        d1_.pop(key)  # type: ignore[misc]
8✔
733
            return d1_
8✔
734

735
        return _sub_dict_deep_difference(copy.deepcopy(d1), d2)
8✔
736

737
    @staticmethod
8✔
738
    def dict_deep_merge(source: _Config, destination: _Config) -> _Config:
8✔
739
        """Recursively deep merges source dict into destination dict.
740

741
        :param source: The first dict.
742
        :param destination: The second dict.
743
        :return: The deep merged dict.
744
        """
745

746
        # https://stackoverflow.com/a/20666342
747

748
        def _sub_dict_deep_merge(source_: _Config, destination_: _Config) -> _Config:
8✔
749
            """Recursive sub-function to merges source_ dict into destination_ dict.
750

751
            :param source_: The first dict.
752
            :param destination_: The second dict.
753
            :return: The merged dict.
754
            """
755
            for key, value in source_.items():
8✔
756
                if isinstance(value, dict):
8✔
757
                    # get node or create one
758
                    node = destination_.setdefault(key, {})  # type: ignore[misc]
8✔
759
                    _sub_dict_deep_merge(value, node)  # type: ignore[arg-type]
8✔
760
                else:
761
                    destination_[key] = value  # type: ignore[literal-required]
8✔
762

763
            return destination_
8✔
764

765
        return _sub_dict_deep_merge(source, copy.deepcopy(destination))
8✔
766

767
    def check_for_unrecognized_keys(self, config: _Config) -> None:
8✔
768
        """Test if config has keys not in DEFAULT_CONFIG (bad keys, e.g. typos); if so, raise ValueError.
769

770
        :param config: The configuration.
771
        :raises ValueError: If the configuration has keys not in DEFAULT_CONFIG (bad keys, e.g. typos)
772
        """
773
        config_for_extras = copy.deepcopy(config)
8✔
774
        if 'job_defaults' in config_for_extras:
8!
775
            # Create missing 'job_defaults' keys from DEFAULT_CONFIG
776
            for key in DEFAULT_CONFIG['job_defaults']:
8✔
777
                if 'job_defaults' not in config_for_extras:
8!
778
                    config_for_extras['job_defaults'] = {}
×
779
                config_for_extras['job_defaults'][key] = None  # type: ignore[literal-required]
8✔
780
            for key in DEFAULT_CONFIG['differ_defaults']:
8✔
781
                if 'differ_defaults' not in config_for_extras:
8!
782
                    config_for_extras['differ_defaults'] = {}
×
783
                config_for_extras['differ_defaults'][key] = None  # type: ignore[literal-required]
8✔
784
        if 'hooks' in sys.modules:
8✔
785
            # Remove extra keys in config used in hooks (they are not in DEFAULT_CONFIG)
786
            for _, obj in inspect.getmembers(
8✔
787
                sys.modules['hooks'], lambda x: inspect.isclass(x) and x.__module__ == 'hooks'
788
            ):
789
                if issubclass(obj, JobBase):
8✔
790
                    if obj.__kind__ not in DEFAULT_CONFIG['job_defaults'].keys():
8!
791
                        config_for_extras['job_defaults'].pop(obj.__kind__, None)  # type: ignore[misc]
8✔
792
                    elif obj.__kind__ not in DEFAULT_CONFIG['job_defaults'].keys():
×
793
                        config_for_extras['job_defaults'].pop(obj.__kind__, None)  # type: ignore[misc]
×
794
                elif issubclass(obj, ReporterBase):
8✔
795
                    if obj.__kind__ not in DEFAULT_CONFIG['report'].keys():
8!
796
                        config_for_extras['report'].pop(obj.__kind__, None)  # type: ignore[misc]
8✔
797
        if 'slack' in config_for_extras.get('report', {}):
8✔
798
            # Ignore legacy key
799
            config_for_extras['report'].pop('slack')  # type: ignore[typeddict-item]
8✔
800
        extras: _Config = self.dict_deep_difference(config_for_extras, DEFAULT_CONFIG, ignore_underline_keys=True)
8✔
801
        if not extras.get('report'):
8✔
802
            extras.pop('report', None)  # type: ignore[misc]
8✔
803
        if extras:
8✔
804
            warnings.warn(
8✔
805
                f'Found unrecognized directive(s) in the configuration file {self.filename}:\n'
806
                f'{yaml.safe_dump(extras)}Check for typos or the hooks.py file (if any); documentation is at '
807
                f'{__docs_url__}\n',
808
                RuntimeWarning,
809
                stacklevel=1,
810
            )
811

812
    @staticmethod
8✔
813
    def replace_none_keys(config: _Config) -> None:
8✔
814
        """Fixes None keys in loaded config that should be empty dicts instead."""
815
        if 'job_defaults' not in config:
8!
816
            config['job_defaults'] = DEFAULT_CONFIG['job_defaults']
×
817
        else:
818
            if 'shell' in config['job_defaults']:
8!
819
                if 'command' in config['job_defaults']:
×
820
                    raise KeyError(
×
821
                        "Found both 'shell' and 'command' job_defaults in config, a duplicate. Please remove 'shell' "
822
                        'ones.'
823
                    )
824
                else:
825
                    config['job_defaults']['command'] = config['job_defaults'].pop(
×
826
                        'shell'  # type: ignore[typeddict-item]
827
                    )
828
            for key in {'all', 'url', 'browser', 'command'}:
8✔
829
                if key not in config['job_defaults']:
8!
830
                    config['job_defaults'][key] = {}  # type: ignore[literal-required]
×
831
                elif config['job_defaults'][key] is None:  # type: ignore[literal-required]
8!
832
                    config['job_defaults'][key] = {}  # type: ignore[literal-required]
×
833

834
    def load(self, *args: Any) -> None:
8✔
835
        """Load configuration file from self.filename into self.config adding missing keys from DEFAULT_CONFIG.
836

837
        :param args: None used.
838
        """
839
        logger.debug(f'Loading configuration from {self.filename}')
8✔
840
        config: _Config = self.parse(self.filename)
8✔
841

842
        if config:
8✔
843
            self.replace_none_keys(config)
8✔
844
            self.check_for_unrecognized_keys(config)
8✔
845

846
            # If config is missing keys in DEFAULT_CONFIG, log the missing keys and deep merge DEFAULT_CONFIG
847
            missing = self.dict_deep_difference(DEFAULT_CONFIG, config, ignore_underline_keys=True)
8✔
848
            if missing:
8!
849
                logger.info(
8✔
850
                    f'The configuration file {self.filename} is missing directive(s); using default value for those. '
851
                    'Run with -vv for more detalis.'
852
                )
853
                logger.debug(
8✔
854
                    f'The following default values are being used:\n'
855
                    f'{yaml.safe_dump(missing)}'
856
                    f'See documentation at {__docs_url__}en/stable/configuration.html'
857
                )
858
                config = self.dict_deep_merge(config or {}, DEFAULT_CONFIG)
8✔
859

860
            # format headers
861
            for job_defaults_type in {'all', 'url', 'browser'}:
8✔
862
                if 'headers' in config['job_defaults'][job_defaults_type]:  # type: ignore[literal-required]
8!
863
                    config['job_defaults'][job_defaults_type]['headers'] = Headers(  # type: ignore[literal-required]
×
864
                        {
865
                            k: str(v)
866
                            for k, v in config['job_defaults'][job_defaults_type][  # type: ignore[literal-required]
867
                                'headers'
868
                            ].items()
869
                        },
870
                        encoding='utf-8',
871
                    )
872
                if 'cookies' in config['job_defaults'][job_defaults_type]:  # type: ignore[literal-required]
8!
873
                    config['job_defaults'][job_defaults_type]['cookies'] = {  # type: ignore[literal-required]
×
874
                        k: str(v)
875
                        for k, v in config['job_defaults'][job_defaults_type][  # type: ignore[literal-required]
876
                            'cookies'
877
                        ].items()
878
                    }
879
            logger.info(f'Loaded configuration from {self.filename}')
8✔
880

881
        else:
882
            logger.warning(f'No directives found in the configuration file {self.filename}; using default directives.')
8✔
883
            config = DEFAULT_CONFIG
8✔
884

885
        self.config = config
8✔
886

887
    def save(self, *args: Any, **kwargs: Any) -> None:
8✔
888
        """Save self.config into self.filename using YAML.
889

890
        :param args: None used.
891
        :param kwargs: None used.
892
        """
893
        with self.filename.open('w') as fp:
8✔
894
            fp.write(
8✔
895
                f'# {__project_name__} configuration file. See {__docs_url__}en/stable/configuration.html\n'
896
                f'# Originally written on {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}Z by version'
897
                f' {__version__}.\n'
898
                f'\n'
899
            )
900
            yaml.safe_dump(self.config, fp, allow_unicode=True, sort_keys=False)
8✔
901

902
    @classmethod
8✔
903
    def write_default_config(cls, filename: Path) -> None:
8✔
904
        """Write default configuration to file.
905

906
        :param filename: The filename.
907
        """
908
        config_storage = cls(filename)
8✔
909
        config_storage.config = DEFAULT_CONFIG
8✔
910
        config_storage.save()
8✔
911

912

913
class YamlJobsStorage(BaseYamlFileStorage, JobsBaseFileStorage):
8✔
914
    """Class for jobs file (is a YAML textual file)."""
915

916
    @classmethod
8✔
917
    def _parse(cls, fp: TextIO, filenames: list[Path]) -> list[JobBase]:
8✔
918
        """Parse the contents of a jobs YAML file.
919

920
        :param fp: The text stream to parse.
921
        :return: A list of JobBase objects.
922
        :raise yaml.YAMLError: If a YAML error is found in the file.
923
        :raise ValueError: If a duplicate URL/command is found in the list.
924
        """
925

926
        def job_files_for_error() -> list[str]:
8✔
927
            """
928
            :return: A list of line containing the names of the job files.
929
            """
930
            if len(filenames) > 1:
8!
931
                jobs_files = ['in the concatenation of the jobs files:'] + [f'• {file},' for file in filenames]
×
932
            elif len(filenames) == 1:
8!
933
                jobs_files = [f'in jobs file {filenames[0]}.']
8✔
934
            else:
935
                jobs_files = []
×
936
            return jobs_files
8✔
937

938
        jobs = []
8✔
939
        jobs_by_guid = defaultdict(list)
8✔
940
        try:
8✔
941
            for i, job_data in enumerate((job for job in yaml.safe_load_all(fp) if job)):
8✔
942
                if not isinstance(job_data, dict):
8!
943
                    raise ValueError(
×
944
                        '\n   '.join(
945
                            [
946
                                f'Found invalid job data (consisting of the {type(job_data).__name__} {job_data})',
947
                                *job_files_for_error(),
948
                            ]
949
                        )
950
                    )
951
                job_data['index_number'] = i + 1
8✔
952
                job = JobBase.unserialize(job_data, filenames)
8✔
953
                # TODO Implement 100% validation and remove it from jobs.py
954
                # TODO Try using pydantic to do this.
955
                if not isinstance(job.data, (NoneType, str, dict, list)):
8!
956
                    raise ValueError(
×
957
                        '\n   '.join(
958
                            [
959
                                f"The 'data' key needs to contain a string, a dictionary or a list; found a"
960
                                f' {type(job.data).__name__} ',
961
                                f'in {job.get_indexed_location()}',
962
                                *job_files_for_error(),
963
                            ]
964
                        )
965
                    )
966
                if not isinstance(job.filters, (NoneType, list)):
8!
967
                    raise ValueError(
×
968
                        '\n   '.join(
969
                            [
970
                                f"The 'filter' key needs to contain a list; found a {type(job.filters).__name__} ",
971
                                f'in {job.get_indexed_location()}',
972
                                *job_files_for_error(),
973
                            ]
974
                        )
975
                    )
976
                if not isinstance(job.headers, (NoneType, dict, Headers)):
8!
977
                    raise ValueError(
×
978
                        '\n   '.join(
979
                            [
980
                                f"The 'headers' key needs to contain a dictionary; found a "
981
                                f'{type(job.headers).__name__} ',
982
                                f'in {job.get_indexed_location()})',
983
                                *job_files_for_error(),
984
                            ]
985
                        )
986
                    )
987
                if not isinstance(job.cookies, (NoneType, dict)):
8!
988
                    raise ValueError(
×
989
                        '\n   '.join(
990
                            [
991
                                f"The 'cookies' key needs to contain a dictionary; found a "
992
                                f'{type(job.headers).__name__} ',
993
                                f'in {job.get_indexed_location()})',
994
                                *job_files_for_error(),
995
                            ]
996
                        )
997
                    )
998
                if not isinstance(job.switches, (NoneType, str, list)):
8!
999
                    raise ValueError(
×
1000
                        '\n   '.join(
1001
                            [
1002
                                f"The 'switches' key needs to contain a string or a list; found a "
1003
                                f'{type(job.switches).__name__} ',
1004
                                f'in {job.get_indexed_location()}',
1005
                                *job_files_for_error(),
1006
                            ]
1007
                        )
1008
                    )
1009
                # We add GUID here to speed things up and to allow hooks to programmatically change job.url and/or
1010
                # job.user_visible_url
1011
                job.guid = job.get_guid()
8✔
1012
                jobs.append(job)
8✔
1013
                jobs_by_guid[job.guid].append(job)
8✔
1014
        except yaml.scanner.ScannerError as e:
×
1015
            raise ValueError(
×
1016
                '\n   '.join(
1017
                    [
1018
                        f'YAML parser {e.args[2].replace("here", "")} in line {e.args[3].line + 1}, column'
1019
                        f' {e.args[3].column + 1}',
1020
                        *job_files_for_error(),
1021
                    ]
1022
                )
1023
            ) from None
1024

1025
        conflicting_jobs = []
8✔
1026
        for _, guid_jobs in jobs_by_guid.items():
8✔
1027
            if len(guid_jobs) != 1:
8✔
1028
                conflicting_jobs.append(guid_jobs[0].get_location())
8✔
1029

1030
        if conflicting_jobs:
8✔
1031
            raise ValueError(
8✔
1032
                '\n   '.join(
1033
                    ['Each job must have a unique URL/command (for URLs, append #1, #2, etc. to make them unique):']
1034
                    + [f'• {job}' for job in conflicting_jobs]
1035
                    + ['']
1036
                    + job_files_for_error()
1037
                )
1038
            ) from None
1039

1040
        return jobs
8✔
1041

1042
    @classmethod
8✔
1043
    def parse(cls, filename: Path) -> list[JobBase]:
8✔
1044
        """Parse the contents of a jobs YAML file and return a list of jobs.
1045

1046
        :param filename: The filename Path.
1047
        :return: A list of JobBase objects.
1048
        """
1049
        if filename is not None and filename.is_file():
8!
1050
            with filename.open() as fp:
8✔
1051
                return cls._parse(fp, [filename])
8✔
1052
        return []
×
1053

1054
    def load(self, *args: Any) -> list[JobBase]:
8✔
1055
        """Parse the contents of the jobs YAML file(s) and return a list of jobs.
1056

1057
        :return: A list of JobBase objects.
1058
        """
1059
        if len(self.filename) == 1:
8!
1060
            with self.filename[0].open() as f:
8✔
1061
                return self._parse(f, self.filename)
8✔
1062
        else:
1063
            fp = io.StringIO('\n---\n'.join(f.read_text(encoding='utf-8-sig') for f in self.filename if f.is_file()))
×
1064
            return self._parse(fp, self.filename)
×
1065

1066
    def save(self, jobs: Iterable[JobBase]) -> None:
8✔
1067
        """Save jobs to the job YAML file.
1068

1069
        :param jobs: An iterable of JobBase objects to be written.
1070
        """
1071
        print(f'Saving updated list to {self.filename[0]}.')
8✔
1072

1073
        with self.filename[0].open('w') as fp:
8✔
1074
            yaml.safe_dump_all([job.serialize() for job in jobs], fp, allow_unicode=True, sort_keys=False)
8✔
1075

1076

1077
class SsdbStorage(BaseFileStorage, ABC):
8✔
1078
    """Base class for snapshots storage."""
1079

1080
    @abstractmethod
8✔
1081
    def close(self) -> None:
8✔
1082
        pass
8✔
1083

1084
    @abstractmethod
8✔
1085
    def get_guids(self) -> list[str]:
8✔
1086
        pass
8✔
1087

1088
    @abstractmethod
8✔
1089
    def load(self, guid: str) -> Snapshot:
8✔
1090
        pass
8✔
1091

1092
    @abstractmethod
8✔
1093
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
8✔
1094
        pass
8✔
1095

1096
    @abstractmethod
8✔
1097
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
8✔
1098
        pass
×
1099

1100
    @abstractmethod
8✔
1101
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
8✔
1102
        pass
8✔
1103

1104
    @abstractmethod
8✔
1105
    def delete(self, guid: str) -> None:
8✔
1106
        pass
8✔
1107

1108
    @abstractmethod
8✔
1109
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
8✔
1110
        """For the given 'guid', delete only the latest 'delete_entries' entries and keep all other (older) ones.
1111

1112
        :param guid: The guid.
1113
        :param delete_entries: The number of most recent entries to delete.
1114

1115
        :returns: Number of records deleted.
1116
        """
1117
        pass
8✔
1118

1119
    @abstractmethod
8✔
1120
    def delete_all(self) -> int:
8✔
1121
        """Delete all entries; used for testing only.
1122

1123
        :returns: Number of records deleted.
1124
        """
1125
        pass
×
1126

1127
    @abstractmethod
8✔
1128
    def clean(self, guid: str, keep_entries: int = 1) -> int:
8✔
1129
        pass
8✔
1130

1131
    @abstractmethod
8✔
1132
    def move(self, guid: str, new_guid: str) -> int:
8✔
1133
        pass
×
1134

1135
    @abstractmethod
8✔
1136
    def rollback(self, timestamp: float) -> int | None:
8✔
1137
        pass
8✔
1138

1139
    def backup(self) -> Iterator[tuple[str, str | bytes, float, int, str, str, ErrorData]]:
8✔
1140
        """Return the most recent entry for each 'guid'.
1141

1142
        :returns: A generator of tuples, each consisting of (guid, data, timestamp, tries, etag, mime_type)
1143
        """
1144
        for guid in self.get_guids():
8✔
1145
            data, timestamp, tries, etag, mime_type, error_data = self.load(guid)
8✔
1146
            yield guid, data, timestamp, tries, etag, mime_type, error_data
8✔
1147

1148
    def restore(self, entries: Iterable[tuple[str, str | bytes, float, int, str, str, ErrorData]]) -> None:
8✔
1149
        """Save multiple entries into the database.
1150

1151
        :param entries: An iterator of tuples WHERE each consists of (guid, data, timestamp, tries, etag, mime_type)
1152
        """
1153
        for guid, data, timestamp, tries, etag, mime_type, error_data in entries:
8✔
1154
            new_snapshot = Snapshot(
8✔
1155
                data=data, timestamp=timestamp, tries=tries, etag=etag, mime_type=mime_type, error_data=error_data
1156
            )
1157
            self.save(guid=guid, snapshot=new_snapshot, temporary=False)
8✔
1158

1159
    def gc(self, known_guids: Iterable[str], keep_entries: int = 1) -> None:
8✔
1160
        """Garbage collect the database: delete all guids not included in known_guids and keep only last n snapshot for
1161
        the others.
1162

1163
        :param known_guids: The guids to keep.
1164
        :param keep_entries: Number of entries to keep after deletion for the guids to keep.
1165
        """
1166
        for guid in set(self.get_guids()) - set(known_guids):
8✔
1167
            print(f'Deleting job {guid} (no longer being tracked).')
8✔
1168
            self.delete(guid)
8✔
1169
        self.clean_ssdb(known_guids, keep_entries)
8✔
1170

1171
    def clean_ssdb(self, known_guids: Iterable[str], keep_entries: int = 1) -> None:
8✔
1172
        """Convenience function to clean the cache.
1173

1174
        If self.clean_all is present, runs clean_all(). Otherwise, runs clean() on all known_guids, one at a time.
1175
        Prints the number of snapshots deleted.
1176

1177
        :param known_guids: An iterable of guids
1178
        :param keep_entries: Number of entries to keep after deletion.
1179
        """
1180
        if hasattr(self, 'clean_all'):
8✔
1181
            count = self.clean_all(keep_entries)
8✔
1182
            if count:
8✔
1183
                print(f'Deleted {count} old snapshots.')
8✔
1184
        else:
1185
            for guid in known_guids:
8✔
1186
                count = self.clean(guid, keep_entries)
8✔
1187
                if count:
8✔
1188
                    print(f'Deleted {count} old snapshots of {guid}.')
8✔
1189

1190
    @abstractmethod
8✔
1191
    def flushdb(self) -> None:
8✔
1192
        """Delete all entries of the database.  Use with care, there is no undo!"""
1193
        pass
×
1194

1195

1196
class SsdbDirStorage(SsdbStorage):
8✔
1197
    """Class for snapshots stored as individual textual files in a directory 'dirname'."""
1198

1199
    def __init__(self, dirname: str | Path) -> None:
8✔
1200
        super().__init__(dirname)
8✔
1201
        self.filename.mkdir(parents=True, exist_ok=True)  # using the attr filename because it is a Path (confusing!)
8✔
1202
        logger.info(f'Using directory {self.filename} to store snapshot data as individual text files')
8✔
1203

1204
    def close(self) -> None:
8✔
1205
        # Nothing to close
1206
        return
×
1207

1208
    def _get_filename(self, guid: str) -> Path:
8✔
1209
        return self.filename.joinpath(guid)  # filename is a dir (confusing!)
8✔
1210

1211
    def get_guids(self) -> list[str]:
8✔
1212
        return [filename.name for filename in self.filename.iterdir()]
8✔
1213

1214
    def load(self, guid: str) -> Snapshot:
8✔
1215
        filename = self._get_filename(guid)
8✔
1216
        if not filename.is_file():
8✔
1217
            return Snapshot('', 0, 0, '', '', {})
8✔
1218

1219
        try:
8✔
1220
            data = filename.read_text()
8✔
1221
        except UnicodeDecodeError:
×
1222
            data = filename.read_text(errors='ignore')
×
1223
            logger.warning(f'Found and ignored Unicode-related errors when retrieving saved snapshot {guid}')
×
1224

1225
        timestamp = filename.stat().st_mtime
8✔
1226

1227
        return Snapshot(data, timestamp, 0, '', '', {})
8✔
1228

1229
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
8✔
1230
        if count is not None and count < 1:
8✔
1231
            return {}
8✔
1232
        else:
1233
            snapshot = self.load(guid)
8✔
1234
            return {snapshot.data: snapshot.timestamp} if snapshot.data and snapshot.timestamp else {}
8✔
1235

1236
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
8✔
1237
        if count is not None and count < 1:
8✔
1238
            return []
8✔
1239
        else:
1240
            snapshot = self.load(guid)
8✔
1241
            return [snapshot] if snapshot.data and snapshot.timestamp else []
8✔
1242

1243
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
8✔
1244
        # ETag and mime_type are ignored
1245
        filename = self._get_filename(guid)
8✔
1246
        with filename.open('w+') as fp:
8✔
1247
            fp.write(str(snapshot.data))
8✔
1248
        os.utime(filename, times=(datetime.now().timestamp(), snapshot.timestamp))  # noqa: DTZ005
8✔
1249

1250
    def delete(self, guid: str) -> None:
8✔
1251
        filename = self._get_filename(guid)
8✔
1252
        filename.unlink(missing_ok=True)
8✔
1253
        return
8✔
1254

1255
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
8✔
1256
        """For the given 'guid', delete the latest entry and keep all other (older) ones.
1257

1258
        :param guid: The guid.
1259
        :param delete_entries: The number of most recent entries to delete.
1260

1261
        :raises NotImplementedError: This function is not implemented for 'textfiles' databases.
1262
        """
1263
        raise NotImplementedError(
1264
            "Deleting of latest snapshot not supported by 'textfiles' database engine since only one snapshot is "
1265
            "saved. Delete all snapshots if that's what you are trying to do."
1266
        )
1267

1268
    def delete_all(self) -> int:
8✔
1269
        """Delete all entries; used for testing only.
1270

1271
        :raises NotImplementedError: This function is not implemented for 'textfiles' databases.
1272
        """
1273
        raise NotImplementedError(
1274
            "Deleting of latest snapshot not supported by 'textfiles' database engine since only one snapshot is "
1275
            "saved. Delete all snapshots if that's what you are trying to do."
1276
        )
1277

1278
    def clean(self, guid: str, keep_entries: int = 1) -> int:
8✔
1279
        if keep_entries != 1:
8✔
1280
            raise NotImplementedError('Only keeping latest 1 entry is supported.')
1281
        # We only store the latest version, no need to clean
1282
        return 0
8✔
1283

1284
    def move(self, guid: str, new_guid: str) -> int:
8✔
1285
        if guid == new_guid:
8!
1286
            return 0
×
1287
        os.rename(self._get_filename(guid), self._get_filename(new_guid))
8✔
1288
        return 1
8✔
1289

1290
    def rollback(self, timestamp: float) -> None:
8✔
1291
        raise NotImplementedError("'textfiles' databases cannot be rolled back as new snapshots overwrite old ones")
1292

1293
    def flushdb(self) -> None:
8✔
1294
        for file in self.filename.iterdir():
8✔
1295
            if file.is_file():
8!
1296
                file.unlink()
8✔
1297

1298

1299
class SsdbSQLite3Storage(SsdbStorage):
8✔
1300
    """
1301
    Handles storage of the snapshot as a SQLite database in the 'filename' file using Python's built-in sqlite3 module
1302
    and the msgpack package.
1303

1304
    A temporary database is created by __init__ and will be written by the 'save()' function (unless temporary=False).
1305
    This data will be written to the permanent one by the 'close()' function, which is called at the end of program
1306
    execution.
1307

1308
    The database contains the 'webchanges' table with the following columns:
1309

1310
    * guid: unique hash of the "location", i.e. the URL/command; indexed
1311
    * timestamp: the Unix timestamp of when then the snapshot was taken; indexed
1312
    * msgpack_data: a msgpack blob containing 'data', 'tries', 'etag' and 'mime_type' in a dict of keys 'd', 't',
1313
      'e' and 'm'
1314
    """
1315

1316
    def __init__(self, filename: Path, max_snapshots: int = 4) -> None:
8✔
1317
        """
1318
        :param filename: The full filename of the database file
1319
        :param max_snapshots: The maximum number of snapshots to retain in the database for each 'guid'
1320
        """
1321
        # Opens the database file and, if new, creates a table and index.
1322

1323
        self.max_snapshots = max_snapshots
8✔
1324

1325
        logger.debug(f'Run-time SQLite library: {sqlite3.sqlite_version}')
8✔
1326
        super().__init__(filename)
8✔
1327

1328
        self.filename.parent.mkdir(parents=True, exist_ok=True)
8✔
1329

1330
        # https://stackoverflow.com/questions/26629080
1331
        self.lock = threading.RLock()
8✔
1332

1333
        self.db = sqlite3.connect(filename, check_same_thread=False)
8✔
1334
        logger.info(f'Using sqlite3 {sqlite3.sqlite_version} database at {filename}')
8✔
1335
        self.cur = self.db.cursor()
8✔
1336
        self.cur.execute('PRAGMA temp_store = MEMORY;')
8✔
1337
        tables = self._execute("SELECT name FROM sqlite_master WHERE type='table';").fetchone()
8✔
1338

1339
        def _initialize_table() -> None:
8✔
1340
            logger.debug('Initializing sqlite3 database')
8✔
1341
            self._execute('CREATE TABLE webchanges (uuid TEXT, timestamp REAL, msgpack_data BLOB)')
8✔
1342
            self._execute('CREATE INDEX idx_uuid_time ON webchanges(uuid, timestamp)')
8✔
1343
            self.db.commit()
8✔
1344

1345
        if tables == ('CacheEntry',):
8✔
1346
            logger.info("Found legacy 'minidb' database to convert")
8✔
1347

1348
            # Found a minidb legacy database; close it, rename it for migration and create new sqlite3 one
1349
            import importlib.util
8✔
1350

1351
            if importlib.util.find_spec('minidb') is None:
8!
1352
                print('You have an old snapshot database format that needs to be converted to a current one.')
×
1353
                print(
×
1354
                    f"Please install the Python package 'minidb' for this one-time conversion and rerun "
1355
                    f'{__project_name__}.'
1356
                )
1357
                print('Use e.g. `pip install -U minidb`.')
×
1358
                print()
×
1359
                print("After the conversion, you can uninstall 'minidb' with e.g. `pip uninstall minidb`.")
×
1360
                sys.exit(1)
×
1361

1362
            print('Performing one-time conversion from old snapshot database format.')
8✔
1363
            self.db.close()
8✔
1364
            minidb_filename = filename.with_stem(filename.stem + '_minidb')
8✔
1365
            self.filename.replace(minidb_filename)
8✔
1366
            self.db = sqlite3.connect(filename, check_same_thread=False)
8✔
1367
            self.cur = self.db.cursor()
8✔
1368
            _initialize_table()
8✔
1369
            # Migrate the minidb legacy database renamed above
1370
            self.migrate_from_minidb(minidb_filename)
8✔
1371
        elif tables != ('webchanges',):
8!
1372
            _initialize_table()
8✔
1373

1374
        # Create temporary database in memory for writing during execution (fault tolerance)
1375
        logger.debug('Creating temp sqlite3 database file in memory')
8✔
1376
        self.temp_lock = threading.RLock()
8✔
1377
        self.temp_db = sqlite3.connect('', check_same_thread=False)
8✔
1378
        self.temp_cur = self.temp_db.cursor()
8✔
1379
        self._temp_execute('CREATE TABLE webchanges (uuid TEXT, timestamp REAL, msgpack_data BLOB)')
8✔
1380
        self.temp_db.commit()
8✔
1381

1382
    def _execute(self, sql: str, args: tuple | None = None) -> sqlite3.Cursor:
8✔
1383
        """Execute SQL command on main database"""
1384
        if args is None:
8✔
1385
            logger.debug(f"Executing (perm) '{sql}'")
8✔
1386
            return self.cur.execute(sql)
8✔
1387
        else:
1388
            logger.debug(f"Executing (perm) '{sql}' with {args}")
8✔
1389
            return self.cur.execute(sql, args)
8✔
1390

1391
    def _temp_execute(self, sql: str, args: tuple | None = None) -> sqlite3.Cursor:
8✔
1392
        """Execute SQL command on temp database."""
1393
        if args is None:
8✔
1394
            logger.debug(f"Executing (temp) '{sql}'")
8✔
1395
            return self.temp_cur.execute(sql)
8✔
1396
        else:
1397
            logger.debug(f"Executing (temp) '{sql}' with {args[:2]}...")
8✔
1398
            return self.temp_cur.execute(sql, args)
8✔
1399

1400
    def _copy_temp_to_permanent(self, delete: bool = False) -> None:
8✔
1401
        """Copy contents of temporary database to permanent one.
1402

1403
        :param delete: also delete contents of temporary cache (used for testing)
1404
        """
1405
        logger.debug('Saving new snapshots to permanent sqlite3 database')
8✔
1406
        # with self.temp_lock:
1407
        #     self.temp_db.commit()
1408
        # with self.lock:
1409
        #     self._execute('ATTACH DATABASE ? AS temp_db', (str(self.temp_filename),))
1410
        #     self._execute('INSERT INTO webchanges SELECT * FROM temp_db.webchanges')
1411
        #     logger.debug(f'Wrote {self.cur.rowcount} new snapshots to permanent sqlite3 database')
1412
        #     self.db.commit()
1413
        #     self._execute('DETACH DATABASE temp_db')
1414
        with self.temp_lock:
8✔
1415
            with self.lock:
8✔
1416
                for row in self._temp_execute('SELECT * FROM webchanges').fetchall():
8✔
1417
                    self._execute('INSERT INTO webchanges VALUES (?, ?, ?)', row)
8✔
1418
                self.db.commit()
8✔
1419
            if delete:
8✔
1420
                self._temp_execute('DELETE FROM webchanges')
8✔
1421

1422
    def close(self) -> None:
8✔
1423
        """Writes the temporary database to the permanent one, purges old entries if required, and closes all database
1424
        connections."""
1425
        self._copy_temp_to_permanent()
8✔
1426
        with self.temp_lock:
8✔
1427
            self.temp_db.close()
8✔
1428
            logger.debug('Cleaning up the permanent sqlite3 database and closing the connection')
8✔
1429
        with self.lock:
8✔
1430
            if self.max_snapshots:
8✔
1431
                num_del = self.keep_latest(self.max_snapshots)
8✔
1432
                logger.debug(
8✔
1433
                    f'Keeping no more than {self.max_snapshots} snapshots per job: purged {num_del} older entries'
1434
                )
1435
            else:
1436
                self.db.commit()
8✔
1437
            self.db.close()
8✔
1438
            logger.info(f'Closed main sqlite3 database file {self.filename}')
8✔
1439
        del self.temp_cur
8✔
1440
        del self.temp_db
8✔
1441
        del self.temp_lock
8✔
1442
        del self.cur
8✔
1443
        del self.db
8✔
1444
        del self.lock
8✔
1445

1446
    def get_guids(self) -> list[str]:
8✔
1447
        """Lists the unique 'guid's contained in the database.
1448

1449
        :returns: A list of guids.
1450
        """
1451
        with self.lock:
8✔
1452
            self.cur.row_factory = lambda cursor, row: row[0]
8✔
1453
            guids = self._execute('SELECT DISTINCT uuid FROM webchanges').fetchall()
8✔
1454
            self.cur.row_factory = None
8✔
1455
        return guids
8✔
1456

1457
    def load(self, guid: str) -> Snapshot:
8✔
1458
        """Return the most recent entry matching a 'guid'.
1459

1460
        :param guid: The guid.
1461

1462
        :returns: A tuple (data, timestamp, tries, etag)
1463
            WHERE
1464

1465
            - data is the data;
1466
            - timestamp is the timestamp;
1467
            - tries is the number of tries;
1468
            - etag is the ETag.
1469
        """
1470
        with self.lock:
8✔
1471
            row = self._execute(
8✔
1472
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC LIMIT 1',
1473
                (guid,),
1474
            ).fetchone()
1475
        if row:
8✔
1476
            msgpack_data, timestamp = row
8✔
1477
            r = msgpack.unpackb(msgpack_data)
8✔
1478
            return Snapshot(r['d'], timestamp, r['t'], r['e'], r.get('m', ''), r.get('err', {}))
8✔
1479

1480
        return Snapshot('', 0, 0, '', '', {})
8✔
1481

1482
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
8✔
1483
        """Return max 'count' (None = all) records of data and timestamp of **successful** runs for a 'guid'.
1484

1485
        :param guid: The guid.
1486
        :param count: The maximum number of entries to return; if None return all.
1487

1488
        :returns: A dict (key: value)
1489
            WHERE
1490

1491
            - key is the snapshot data;
1492
            - value is the most recent timestamp for such snapshot.
1493
        """
1494
        if count is not None and count < 1:
8✔
1495
            return {}
8✔
1496

1497
        with self.lock:
8✔
1498
            rows = self._execute(
8✔
1499
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC', (guid,)
1500
            ).fetchall()
1501
        history = {}
8✔
1502
        if rows:
8✔
1503
            for msgpack_data, timestamp in rows:
8✔
1504
                r = msgpack.unpackb(msgpack_data)
8✔
1505
                if not r['t']:  # No data is saved when errors are encountered; use get_history_snapshots()
8!
1506
                    if r['d'] not in history:
8!
1507
                        history[r['d']] = timestamp
8✔
1508
                        if count is not None and len(history) >= count:
8!
1509
                            break
×
1510
        return history
8✔
1511

1512
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
8✔
1513
        """Return max 'count' (None = all) entries of all data (including from error runs) saved for a 'guid'.
1514

1515
        :param guid: The guid.
1516
        :param count: The maximum number of entries to return; if None return all.
1517

1518
        :returns: A list of Snapshot tuples (data, timestamp, tries, etag).
1519
            WHERE the values are:
1520

1521
            - data: The data (str, could be empty);
1522
            - timestamp: The timestamp (float);
1523
            - tries: The number of tries (int);
1524
            - etag: The ETag (str, could be empty).
1525
        """
1526
        if count is not None and count < 1:
8✔
1527
            return []
8✔
1528

1529
        with self.lock:
8✔
1530
            rows = self._execute(
8✔
1531
                'SELECT msgpack_data, timestamp FROM webchanges WHERE uuid = ? ORDER BY timestamp DESC', (guid,)
1532
            ).fetchall()
1533
        history: list[Snapshot] = []
8✔
1534
        if rows:
8✔
1535
            for msgpack_data, timestamp in rows:
8✔
1536
                r = msgpack.unpackb(msgpack_data)
8✔
1537
                history.append(Snapshot(r['d'], timestamp, r['t'], r['e'], r.get('m', ''), r.get('err', {})))
8✔
1538
                if count is not None and len(history) >= count:
8!
1539
                    break
×
1540
        return history
8✔
1541

1542
    def save(
8✔
1543
        self,
1544
        *args: Any,
1545
        guid: str,
1546
        snapshot: Snapshot,
1547
        temporary: bool | None = True,
1548
        **kwargs: Any,
1549
    ) -> None:
1550
        """Save the data from a job.
1551

1552
        By default, it is saved into the temporary database. Call close() to transfer the contents of the temporary
1553
        database to the permanent one.
1554

1555
        Note: the logic is such that any attempts that end in an exception will have tries >= 1, and we replace the data
1556
        with the one from the most recent successful attempt.
1557

1558
        :param guid: The guid.
1559
        :param data: The data.
1560
        :param timestamp: The timestamp.
1561
        :param tries: The number of tries.
1562
        :param etag: The ETag (could be empty string).
1563
        :param temporary: If true, saved to temporary database (default).
1564
        """
1565

1566
        c = {
8✔
1567
            'd': snapshot.data,
1568
            't': snapshot.tries,
1569
            'e': snapshot.etag,
1570
            'm': snapshot.mime_type,
1571
            'err': snapshot.error_data,
1572
        }
1573
        msgpack_data = msgpack.packb(c)
8✔
1574
        if temporary:
8✔
1575
            with self.temp_lock:
8✔
1576
                self._temp_execute('INSERT INTO webchanges VALUES (?, ?, ?)', (guid, snapshot.timestamp, msgpack_data))
8✔
1577
                # we do not commit to temporary as it's being used as write-only (we commit at the end)
1578
        else:
1579
            with self.lock:
8✔
1580
                self._execute('INSERT INTO webchanges VALUES (?, ?, ?)', (guid, snapshot.timestamp, msgpack_data))
8✔
1581
                self.db.commit()
8✔
1582

1583
    def delete(self, guid: str) -> None:
8✔
1584
        """Delete all entries matching a 'guid'.
1585

1586
        :param guid: The guid.
1587
        """
1588
        with self.lock:
8✔
1589
            self._execute('DELETE FROM webchanges WHERE uuid = ?', (guid,))
8✔
1590
            self.db.commit()
8✔
1591

1592
    def delete_latest(
8✔
1593
        self,
1594
        guid: str,
1595
        delete_entries: int = 1,
1596
        temporary: bool | None = False,
1597
        **kwargs: Any,
1598
    ) -> int:
1599
        """For the given 'guid', delete the latest 'delete_entries' number of entries and keep all other (older) ones.
1600

1601
        :param guid: The guid.
1602
        :param delete_entries: The number of most recent entries to delete.
1603
        :param temporary: If False, deleted from permanent database (default).
1604

1605
        :returns: Number of records deleted.
1606
        """
1607
        if temporary:
8✔
1608
            with self.temp_lock:
8✔
1609
                self._temp_execute(
8✔
1610
                    'DELETE FROM webchanges '
1611
                    'WHERE ROWID IN ( '
1612
                    '    SELECT ROWID FROM webchanges '
1613
                    '    WHERE uuid = ? '
1614
                    '    ORDER BY timestamp DESC '
1615
                    '    LIMIT ? '
1616
                    ')',
1617
                    (guid, delete_entries),
1618
                )
1619
                num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1620
        else:
1621
            with self.lock:
8✔
1622
                self._execute(
8✔
1623
                    'DELETE FROM webchanges '
1624
                    'WHERE ROWID IN ( '
1625
                    '    SELECT ROWID FROM webchanges '
1626
                    '    WHERE uuid = ? '
1627
                    '    ORDER BY timestamp DESC '
1628
                    '    LIMIT ? '
1629
                    ')',
1630
                    (guid, delete_entries),
1631
                )
1632
                num_del = self._execute('SELECT changes()').fetchone()[0]
8✔
1633
                self.db.commit()
8✔
1634
        return num_del
8✔
1635

1636
    def delete_all(self) -> int:
8✔
1637
        """Delete all entries; used for testing only.
1638

1639
        :returns: Number of records deleted.
1640
        """
1641
        with self.lock:
8✔
1642
            self._execute('DELETE FROM webchanges')
8✔
1643
            self.db.commit()
8✔
1644
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1645
            self.db.commit()
8✔
1646

1647
        return num_del
8✔
1648

1649
    def clean(self, guid: str, keep_entries: int = 1) -> int:
8✔
1650
        """For the given 'guid', keep only the latest 'keep_entries' number of entries and delete all other (older)
1651
        ones. To delete older entries from all guids, use clean_all() instead.
1652

1653
        :param guid: The guid.
1654
        :param keep_entries: Number of entries to keep after deletion.
1655

1656
        :returns: Number of records deleted.
1657
        """
1658
        with self.lock:
8✔
1659
            self._execute(
8✔
1660
                'DELETE FROM webchanges '
1661
                'WHERE ROWID IN ( '
1662
                '    SELECT ROWID FROM webchanges '
1663
                '    WHERE uuid = ? '
1664
                '    ORDER BY timestamp DESC '
1665
                '    LIMIT -1 '
1666
                '    OFFSET ? '
1667
                ') ',
1668
                (guid, keep_entries),
1669
            )
1670
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1671
            self.db.commit()
8✔
1672
            self._execute('VACUUM')
8✔
1673
        return num_del
8✔
1674

1675
    def move(self, guid: str, new_guid: str) -> int:
8✔
1676
        """Replace uuid in records matching the 'guid' with the 'new_guid' value.
1677

1678
        If there are existing records with 'new_guid', they will not be overwritten and the job histories will be
1679
        merged.
1680

1681
        :returns: Number of records searched for replacement.
1682
        """
1683
        total_searched = 0
8✔
1684
        if guid != new_guid:
8!
1685
            with self.lock:
8✔
1686
                self._execute(
8✔
1687
                    'UPDATE webchanges SET uuid = REPLACE(uuid, ?, ?)',
1688
                    (guid, new_guid),
1689
                )
1690
                total_searched = self._execute('SELECT changes()').fetchone()[0]
8✔
1691
                self.db.commit()
8✔
1692
                self._execute('VACUUM')
8✔
1693

1694
        return total_searched
8✔
1695

1696
    def clean_all(self, keep_entries: int = 1) -> int:
8✔
1697
        """Delete all older entries for each 'guid' (keep only keep_entries).
1698

1699
        :returns: Number of records deleted.
1700
        """
1701
        with self.lock:
8✔
1702
            if keep_entries == 1:
8✔
1703
                self._execute(
8✔
1704
                    'DELETE FROM webchanges '
1705
                    'WHERE EXISTS ( '
1706
                    '    SELECT 1 FROM webchanges '
1707
                    '    w WHERE w.uuid = webchanges.uuid AND w.timestamp > webchanges.timestamp '
1708
                    ')'
1709
                )
1710
            else:
1711
                self._execute(
8✔
1712
                    'DELETE FROM webchanges '
1713
                    'WHERE ROWID IN ( '
1714
                    '    WITH rank_added AS ('
1715
                    '        SELECT '
1716
                    '             ROWID,'
1717
                    '             uuid,'
1718
                    '             timestamp, '
1719
                    '             ROW_NUMBER() OVER (PARTITION BY uuid ORDER BY timestamp DESC) AS rn'
1720
                    '        FROM webchanges '
1721
                    '    ) '
1722
                    '    SELECT ROWID FROM rank_added '
1723
                    '    WHERE rn > ?'
1724
                    ')',
1725
                    (keep_entries,),
1726
                )
1727
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1728
            self.db.commit()
8✔
1729
            self._execute('VACUUM')
8✔
1730
        return num_del
8✔
1731

1732
    def keep_latest(self, keep_entries: int = 1) -> int:
8✔
1733
        """Delete all older entries keeping only the 'keep_num' per guid.
1734

1735
        :param keep_entries: Number of entries to keep after deletion.
1736

1737
        :returns: Number of records deleted.
1738
        """
1739
        with self.lock:
8✔
1740
            self._execute(
8✔
1741
                'WITH '
1742
                'cte AS ( SELECT uuid, timestamp, ROW_NUMBER() OVER ( PARTITION BY uuid '
1743
                '                                                     ORDER BY timestamp DESC ) rn '
1744
                '         FROM webchanges ) '
1745
                'DELETE '
1746
                'FROM webchanges '
1747
                'WHERE EXISTS ( SELECT 1 '
1748
                '               FROM cte '
1749
                '               WHERE webchanges.uuid = cte.uuid '
1750
                '                 AND webchanges.timestamp = cte.timestamp '
1751
                '                 AND cte.rn > ? );',
1752
                (keep_entries,),
1753
            )
1754
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1755
            self.db.commit()
8✔
1756
        return num_del
8✔
1757

1758
    def rollback(self, timestamp: float, count: bool = False) -> int:
8✔
1759
        """Rollback database to the entries present at timestamp.
1760

1761
        :param timestamp: The timestamp.
1762
        :param count: If set to true, only count the number that would be deleted without doing so.
1763

1764
        :returns: Number of records deleted (or to be deleted).
1765
        """
1766
        command = 'SELECT COUNT(*)' if count else 'DELETE'
8✔
1767
        with self.lock:
8✔
1768
            self._execute(
8✔
1769
                f'{command} '  # noqa: S608 Possible SQL injection
1770
                'FROM webchanges '
1771
                'WHERE EXISTS ( '
1772
                '     SELECT 1 '
1773
                '     FROM webchanges AS w '
1774
                '     WHERE w.uuid = webchanges.uuid '
1775
                '     AND webchanges.timestamp > ? '
1776
                '     AND w.timestamp > ? '
1777
                ')',
1778
                (timestamp, timestamp),
1779
            )
1780
            num_del: int = self._execute('SELECT changes()').fetchone()[0]
8✔
1781
            self.db.commit()
8✔
1782
        return num_del
8✔
1783

1784
    def migrate_from_minidb(self, minidb_filename: str | Path) -> None:
8✔
1785
        """Migrate the data of a legacy minidb database to the current database.
1786

1787
        :param minidb_filename: The filename of the legacy minidb database.
1788
        """
1789

1790
        print("Found 'minidb' database and upgrading it to the new engine (note: only the last snapshot is retained).")
8✔
1791
        logger.info(
8✔
1792
            "Found legacy 'minidb' database and converting it to 'sqlite3' and new schema. Package 'minidb' needs to be"
1793
            ' installed for the conversion.'
1794
        )
1795

1796
        from webchanges.storage_minidb import SsdbMiniDBStorage
8✔
1797

1798
        legacy_db = SsdbMiniDBStorage(minidb_filename)
8✔
1799
        self.restore(legacy_db.backup())
8✔
1800
        legacy_db.close()
8✔
1801
        print(f'Database upgrade finished; the following backup file can be safely deleted: {minidb_filename}.\n')
8✔
1802
        print("The 'minidb' package can be removed (unless used by another program): $ pip uninstall minidb.")
8✔
1803
        print('-' * 80)
8✔
1804

1805
    def flushdb(self) -> None:
8✔
1806
        """Delete all entries of the database.  Use with care, there is no undo!"""
1807
        with self.lock:
8✔
1808
            self._execute('DELETE FROM webchanges')
8✔
1809
            self.db.commit()
8✔
1810

1811

1812
class SsdbRedisStorage(SsdbStorage):
8✔
1813
    """Class for storing snapshots using redis."""
1814

1815
    def __init__(self, filename: str | Path) -> None:
8✔
1816
        super().__init__(filename)
4✔
1817

1818
        if isinstance(redis, str):
4!
1819
            raise ImportError(f"Python package 'redis' cannot be imported.\n{redis}")
×
1820

1821
        self.db = redis.from_url(str(filename))
4✔
1822
        logger.info(f'Using {self.filename} for database')
4✔
1823

1824
    @staticmethod
8✔
1825
    def _make_key(guid: str) -> str:
8✔
1826
        return 'guid:' + guid
4✔
1827

1828
    def close(self) -> None:
8✔
1829
        self.db.connection_pool.disconnect()
×
1830
        del self.db
×
1831

1832
    def get_guids(self) -> list[str]:
8✔
1833
        guids = []
4✔
1834
        for guid in self.db.keys('guid:*'):
4✔
1835
            guids.append(guid[5:].decode())
4✔
1836
        return guids
4✔
1837

1838
    def load(self, guid: str) -> Snapshot:
8✔
1839
        key = self._make_key(guid)
4✔
1840
        data = self.db.lindex(key, 0)
4✔
1841

1842
        if data:
4✔
1843
            r = msgpack.unpackb(data)
4✔
1844
            return Snapshot(
4✔
1845
                r['data'], r['timestamp'], r['tries'], r['etag'], r.get('mime_type', ''), r.get('err_data', {})
1846
            )
1847

1848
        return Snapshot('', 0, 0, '', '', {})
4✔
1849

1850
    def get_history_data(self, guid: str, count: int | None = None) -> dict[str | bytes, float]:
8✔
1851
        if count is not None and count < 1:
4✔
1852
            return {}
4✔
1853

1854
        history = {}
4✔
1855
        key = self._make_key(guid)
4✔
1856
        for i in range(0, self.db.llen(key)):
4✔
1857
            r = self.db.lindex(key, i)
4✔
1858
            c = msgpack.unpackb(r)
4✔
1859
            if c['tries'] == 0 or c['tries'] is None:
4!
1860
                if c['data'] not in history:
4!
1861
                    history[c['data']] = c['timestamp']
4✔
1862
                    if count is not None and len(history) >= count:
4!
1863
                        break
×
1864
        return history
4✔
1865

1866
    def get_history_snapshots(self, guid: str, count: int | None = None) -> list[Snapshot]:
8✔
1867
        if count is not None and count < 1:
4✔
1868
            return []
4✔
1869

1870
        history: list[Snapshot] = []
4✔
1871
        key = self._make_key(guid)
4✔
1872
        for i in range(0, self.db.llen(key)):
4✔
1873
            r = self.db.lindex(key, i)
4✔
1874
            c = msgpack.unpackb(r)
4✔
1875
            if c['tries'] == 0 or c['tries'] is None:
4!
1876
                history.append(
4✔
1877
                    Snapshot(
1878
                        c['data'],
1879
                        c['timestamp'],
1880
                        c['tries'],
1881
                        c['etag'],
1882
                        c.get('mime_type', ''),
1883
                        c.get('error_data', {}),
1884
                    )
1885
                )
1886
                if count is not None and len(history) >= count:
4!
1887
                    break
×
1888
        return history
4✔
1889

1890
    def save(self, *args: Any, guid: str, snapshot: Snapshot, **kwargs: Any) -> None:
8✔
1891
        r = {
4✔
1892
            'data': snapshot.data,
1893
            'timestamp': snapshot.timestamp,
1894
            'tries': snapshot.tries,
1895
            'etag': snapshot.etag,
1896
            'mime_type': snapshot.mime_type,
1897
            'error_data': snapshot.error_data,
1898
        }
1899
        packed_data = msgpack.packb(r)
4✔
1900
        if packed_data:
4!
1901
            self.db.lpush(self._make_key(guid), packed_data)
4✔
1902

1903
    def delete(self, guid: str) -> None:
8✔
1904
        self.db.delete(self._make_key(guid))
4✔
1905

1906
    def delete_latest(self, guid: str, delete_entries: int = 1, **kwargs: Any) -> int:
8✔
1907
        """For the given 'guid', delete the latest 'delete_entries' entry and keep all other (older) ones.
1908

1909
        :param guid: The guid.
1910
        :param delete_entries: The number of most recent entries to delete (only 1 is supported by this Redis code).
1911

1912
        :returns: Number of records deleted.
1913
        """
1914
        if delete_entries != 1:
4✔
1915
            raise NotImplementedError('Only deleting of the latest 1 entry is supported by this Redis code.')
1916

1917
        if self.db.lpop(self._make_key(guid)) is None:
4!
1918
            return 0
×
1919

1920
        return 1
4✔
1921

1922
    def delete_all(self) -> int:
8✔
1923
        """Delete all entries; used for testing only.
1924

1925
        :returns: Number of records deleted.
1926
        """
1927
        raise NotImplementedError('This method is not implemented for Redis.')
1928

1929
    def clean(self, guid: str, keep_entries: int = 1) -> int:
8✔
1930
        if keep_entries != 1:
4✔
1931
            raise NotImplementedError('Only keeping latest 1 entry is supported.')
1932

1933
        key = self._make_key(guid)
4✔
1934
        i = self.db.llen(key)
4✔
1935
        if self.db.ltrim(key, 0, 0):
4!
1936
            return i - self.db.llen(key)
4✔
1937

1938
        return 0
×
1939

1940
    def move(self, guid: str, new_guid: str) -> int:
8✔
1941
        if guid == new_guid:
4!
1942
            return 0
×
1943
        key = self._make_key(guid)
4✔
1944
        new_key = self._make_key(new_guid)
4✔
1945
        # Note if a list with 'new_key' already exists, the data stored there
1946
        # will be overwritten.
1947
        self.db.rename(key, new_key)  # type: ignore[no-untyped-call]
4✔
1948
        return self.db.llen(new_key)
4✔
1949

1950
    def rollback(self, timestamp: float) -> None:
8✔
1951
        raise NotImplementedError("Rolling back the database is not supported by 'redis' database engine")
1952

1953
    def flushdb(self) -> None:
8✔
1954
        """Delete all entries of the database.  Use with care, there is no undo!"""
1955
        self.db.flushdb()
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc