• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 13100552072

02 Feb 2025 04:30PM UTC coverage: 75.393% (-0.2%) from 75.597%
13100552072

push

github

mborsetti
Version 3.27.0b3

1712 of 2597 branches covered (65.92%)

Branch coverage included in aggregate %.

4517 of 5665 relevant lines covered (79.74%)

6.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.11
/webchanges/worker.py
1
"""The worker that runs jobs in parallel.  Called from main module."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import gc
8✔
8
import logging
8✔
9
import os
8✔
10
import random
8✔
11
import urllib.parse
8✔
12
from concurrent.futures import ThreadPoolExecutor
8✔
13
from contextlib import ExitStack
8✔
14
from typing import Iterable, TYPE_CHECKING
8✔
15

16
from webchanges.command import UrlwatchCommand
8✔
17
from webchanges.handler import JobState
8✔
18
from webchanges.jobs import NotModifiedError
8✔
19

20
try:
8✔
21
    import psutil
8✔
22
except ImportError as e:  # pragma: no cover
23
    psutil = str(e)  # type: ignore[assignment]
24

25
# https://stackoverflow.com/questions/39740632
26
if TYPE_CHECKING:
27
    from webchanges.jobs import JobBase
28
    from webchanges.main import Urlwatch
29

30
logger = logging.getLogger(__name__)
8✔
31

32

33
def run_jobs(urlwatcher: Urlwatch) -> None:
8✔
34
    """Process (run) jobs in parallel.
35

36
    :raises IndexError: If any index(es) is/are out of range.
37
    """
38

39
    def insert_delay(jobs: set[JobBase]) -> set[JobBase]:  # pragma: no cover
40
        """
41
        TODO: Evaluate whether this is necessary; currently not being called. Remove pragma no cover and move import.
42

43
        Sets a _delay value for URL jobs having the same network location as previous ones. Used to prevent
44
        multiple jobs hitting the same network location at the exact same time and being blocked as a result.
45

46
        CHANGELOG:
47
        * When multiple URL jobs have the same network location (www.example.com), a random delay between 0.1 and 1.0
48
          seconds is added to all jobs to that network location after the first one. This prevents being blocked by
49
          the site as a result of being flooded by **webchanges**'s parallelism sending multiple requests from the same
50
          source at the same exact time.
51

52
        :param jobs: The list of jobs.
53
        :return: The list of jobs with the _delay value set.
54
        """
55
        from webchanges.jobs import UrlJobBase
56

57
        previous_netlocs = set()
58
        for job in jobs:
59
            if isinstance(job, UrlJobBase):
60
                netloc = urllib.parse.urlparse(job.url).netloc
61
                if netloc in previous_netlocs:
62
                    job._delay = random.uniform(0.1, 1)  # noqa: S311 Standard pseudo-random generator not suitable.
63
                else:
64
                    previous_netlocs.add(netloc)
65
        return jobs
66

67
    def job_runner(
8✔
68
        stack: ExitStack,
69
        jobs: Iterable[JobBase],
70
        max_workers: int | None = None,
71
    ) -> None:
72
        """
73
        Runs the jobs in parallel.
74

75
        :param stack: The context manager.
76
        :param jobs: The jobs to run.
77
        :param max_workers: The number of maximum workers for ThreadPoolExecutor.
78
        :return: None
79
        """
80
        executor = ThreadPoolExecutor(max_workers=max_workers)
8✔
81

82
        # launch future to retrieve if new version is available
83
        if urlwatcher.report.new_release_future is None:
8✔
84
            urlwatcher.report.new_release_future = executor.submit(urlwatcher.get_new_release_version)
8✔
85

86
        job_state: JobState
87
        for job_state in executor.map(
8✔
88
            lambda jobstate: jobstate.process(headless=not urlwatcher.urlwatch_config.no_headless),
89
            (stack.enter_context(JobState(urlwatcher.ssdb_storage, job)) for job in jobs),
90
        ):
91
            max_tries = 0 if not job_state.job.max_tries else job_state.job.max_tries
8✔
92
            # tries is incremented by JobState.process when an exception (including 304) is encountered.
93

94
            if job_state.exception is not None:
8✔
95
                # Oops, we have captured an error (which could also be 304 or a Playwright timeout)
96
                if job_state.error_ignored:
8!
97
                    # We captured an error to ignore
98
                    logger.info(
×
99
                        f'Job {job_state.job.index_number}: Error while executing job was ignored (e.g. due to job '
100
                        f'config or browser timing out)'
101
                    )
102
                elif isinstance(job_state.exception, NotModifiedError):
8!
103
                    # We captured a 304 Not Modified
104
                    logger.info(
×
105
                        f'Job {job_state.job.index_number}: Job has not changed (HTTP 304 response or same strong '
106
                        f'ETag)'
107
                    )
108
                    if job_state.tries > 0:
×
109
                        job_state.tries = 0
×
110
                        job_state.save()
×
111
                    if job_state.old_error_data:
×
112
                        urlwatcher.report.unchanged_from_error(job_state)
×
113
                    else:
114
                        urlwatcher.report.unchanged(job_state)
×
115
                elif job_state.tries < max_tries:
8✔
116
                    # We're not reporting the error yet because we haven't yet hit 'max_tries'
117
                    logger.debug(
8✔
118
                        f'Job {job_state.job.index_number}: Error suppressed as cumulative number of '
119
                        f'failures ({job_state.tries}) does not exceed max_tries={max_tries}'
120
                    )
121
                    job_state.save()
8✔
122
                else:
123
                    # Reporting the error
124
                    logger.debug(
8✔
125
                        f'Job {job_state.job.index_number}: Flagged as error as max_tries={max_tries} has been '
126
                        f'met or exceeded ({job_state.tries}'
127
                    )
128
                    job_state.save()
8✔
129
                    if job_state.new_error_data == job_state.old_error_data:
8✔
130
                        urlwatcher.report.error_same_error(job_state)
8✔
131
                    else:
132
                        urlwatcher.report.error(job_state)
8✔
133
            elif job_state.old_data or job_state.old_timestamp != 0:
8✔
134
                # This is not the first time running this job (we have snapshots)
135
                if (
8✔
136
                    job_state.new_data == job_state.old_data
137
                    or job_state.new_data in job_state.history_dic_snapshots.keys()
138
                ):
139
                    # Exactly matches one of the previous snapshots
140
                    if job_state.tries > 0:
8✔
141
                        job_state.tries = 0
8✔
142
                        job_state.save()
8✔
143
                    if job_state.old_error_data:
8!
144
                        urlwatcher.report.unchanged_from_error(job_state)
×
145
                    else:
146
                        urlwatcher.report.unchanged(job_state)
8✔
147
                else:
148
                    # # No exact match to previous snapshot  [fuzzy matching, untested and no longer makes sense]
149
                    # if len(job_state.history_dic_snapshots) > 1:
150
                    #     # Find the closest fuzzy matching saved snapshot ("good enough") and use it to diff against it
151
                    #     close_matches: list[str] = difflib.get_close_matches(
152
                    #         str(job_state.new_data), (str(k) for k in job_state.history_dic_snapshots.keys()), n=1
153
                    #     )
154
                    #     if close_matches:
155
                    #         logger.warning(
156
                    #             f'Job {job_state.job.index_number}: Did not find an existing run in the database,
157
                    #             f'but fuzzy matched it based on the contents of the data'
158
                    #         )
159
                    #         job_state.old_data = close_matches[0]
160
                    #         job_state.old_timestamp = job_state.history_dic_snapshots[close_matches[0]].timestamp
161
                    #         job_state.old_etag = job_state.history_dic_snapshots[close_matches[0]].etag
162
                    #         job_state.old_mime_type = job_state.history_dic_snapshots[close_matches[0]].mime_type
163

164
                    # It has different data, so we save it
165
                    job_state.tries = 0
8✔
166
                    job_state.save()
8✔
167
                    urlwatcher.report.changed(job_state)
8✔
168
            else:
169
                # We have never run this job before (there are no snapshots)
170
                job_state.tries = 0
8✔
171
                job_state.save()
8✔
172
                urlwatcher.report.new(job_state)
8✔
173

174
    jobs = set(UrlwatchCommand(urlwatcher).jobs_from_joblist())
8✔
175

176
    jobs = insert_delay(jobs)
8✔
177

178
    with ExitStack() as stack:  # This code is also present in command.list_error_jobs (change there too!)
8✔
179
        # run non-BrowserJob jobs first
180
        jobs_to_run = [job for job in jobs if not job.__is_browser__]
8!
181
        if jobs_to_run:
8✔
182
            logger.debug(
8✔
183
                "Running jobs that do not require Chrome (without 'use_browser: true') in parallel with Python's "
184
                'default max_workers.'
185
            )
186
            job_runner(stack, jobs_to_run, urlwatcher.urlwatch_config.max_workers)
8✔
187
        else:
188
            logger.debug("Found no jobs that do not require Chrome (i.e. without 'use_browser: true').")
2✔
189

190
        # run BrowserJob jobs after
191
        jobs_to_run = [job for job in jobs if job.__is_browser__]
8!
192
        if jobs_to_run:
8✔
193
            gc.collect()
2✔
194
            virt_mem = get_virt_mem()
2✔
195
            if urlwatcher.urlwatch_config.max_workers:
2!
196
                max_workers = urlwatcher.urlwatch_config.max_workers
×
197
            else:
198
                max_workers = max(int(virt_mem / 200e6), 1)
2✔
199
                max_workers = min(max_workers, os.cpu_count() or 1)
2✔
200
            logger.debug(
2✔
201
                f"Running jobs that require Chrome (i.e. with 'use_browser: true') in parallel with {max_workers} "
202
                f'max_workers.'
203
            )
204
            job_runner(stack, jobs_to_run, max_workers)
2✔
205
        else:
206
            logger.debug("Found no jobs that require Chrome (i.e. with 'use_browser: true').")
8✔
207

208

209
def get_virt_mem() -> int:
8✔
210
    """Return the amount of virtual memory available, i.e. the memory that can be given instantly to processes
211
    without the system going into swap. Expressed in bytes."""
212
    if isinstance(psutil, str):
2!
213
        raise ImportError(
×
214
            "Error when loading package 'psutil'; cannot use 'use_browser: true'. Please install "
215
            f"dependencies with 'pip install webchanges[use_browser]'.\n{psutil}"
216
        ) from None
217
    try:
2✔
218
        virt_mem: int = psutil.virtual_memory().available
2✔
219
        logger.debug(
2✔
220
            f'Found {virt_mem / 1e6:,.0f} MB of available physical memory (plus '
221
            f'{psutil.swap_memory().free / 1e6:,.0f} MB of swap).'
222
        )
223
    except psutil.Error as e:  # pragma: no cover
224
        virt_mem = 0
225
        logger.debug(f'Could not read memory information: {e}')
226

227
    return virt_mem
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc