• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 17710149774

14 Sep 2025 10:49AM UTC coverage: 71.376% (-3.1%) from 74.434%
17710149774

push

github

mborsetti
Version 3.31.1.post2

1383 of 2314 branches covered (59.77%)

Branch coverage included in aggregate %.

4614 of 6088 relevant lines covered (75.79%)

5.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.61
/webchanges/worker.py
1
"""The worker that runs jobs in parallel.  Called from main module."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import gc
8✔
8
import logging
8✔
9
import os
8✔
10
import random
8✔
11
import urllib.parse
8✔
12
from concurrent.futures import ThreadPoolExecutor
8✔
13
from contextlib import ExitStack
8✔
14
from typing import TYPE_CHECKING, Iterable
8✔
15

16
from webchanges.command import UrlwatchCommand
8✔
17
from webchanges.handler import JobState
8✔
18
from webchanges.jobs import NotModifiedError
8✔
19

20
try:
8✔
21
    import psutil
8✔
22
except ImportError as e:  # pragma: no cover
23
    psutil = str(e)  # type: ignore[assignment]
24

25
# https://stackoverflow.com/questions/39740632
26
if TYPE_CHECKING:
27
    from webchanges.jobs import JobBase
28
    from webchanges.main import Urlwatch
29

30
logger = logging.getLogger(__name__)
8✔
31

32

33
def run_jobs(urlwatcher: Urlwatch) -> None:
8✔
34
    """Process (run) jobs in parallel.
35

36
    :raises IndexError: If any index(es) is/are out of range.
37
    """
38

39
    def insert_delay(jobs: set[JobBase]) -> set[JobBase]:  # pragma: no cover
40
        """
41
        TODO Evaluate whether this is necessary; currently not being called. Remove pragma no cover and move import.
42

43
        Sets a _delay value for URL jobs having the same network location as previous ones. Used to prevent
44
        multiple jobs hitting the same network location at the exact same time and being blocked as a result.
45

46
        CHANGELOG:
47
        * When multiple URL jobs have the same network location (www.example.com), a random delay between 0.1 and 1.0
48
          seconds is added to all jobs to that network location after the first one. This prevents being blocked by
49
          the site as a result of being flooded by **webchanges**'s parallelism sending multiple requests from the same
50
          source at the same exact time.
51

52
        :param jobs: The list of jobs.
53
        :return: The list of jobs with the _delay value set.
54
        """
55
        from webchanges.jobs import UrlJobBase
56

57
        previous_netlocs = set()
58
        for job in jobs:
59
            if isinstance(job, UrlJobBase):
60
                netloc = urllib.parse.urlparse(job.url).netloc
61
                if netloc in previous_netlocs:
62
                    job._delay = random.uniform(0.1, 1)  # noqa: S311 Standard pseudo-random generator not suitable.
63
                else:
64
                    previous_netlocs.add(netloc)
65
        return jobs
66

67
    def job_runner(
8✔
68
        stack: ExitStack,
69
        jobs: Iterable[JobBase],
70
        max_workers: int | None = None,
71
    ) -> None:
72
        """
73
        Runs the jobs in parallel.
74

75
        :param stack: The context manager.
76
        :param jobs: The jobs to run.
77
        :param max_workers: The number of maximum workers for ThreadPoolExecutor.
78
        :return: None
79
        """
80
        executor = ThreadPoolExecutor(max_workers=max_workers)
8✔
81

82
        # launch future to retrieve if new version is available
83
        if urlwatcher.report.new_release_future is None:
8✔
84
            urlwatcher.report.new_release_future = executor.submit(urlwatcher.get_new_release_version)
8✔
85

86
        job_state: JobState
87
        for job_state in executor.map(
8✔
88
            lambda jobstate: jobstate.process(headless=not urlwatcher.urlwatch_config.no_headless),
89
            (stack.enter_context(JobState(urlwatcher.ssdb_storage, job)) for job in jobs),
90
        ):
91
            max_tries = 0 if not job_state.job.max_tries else job_state.job.max_tries
8✔
92
            # tries is incremented by JobState.process when an exception (including 304) is encountered.
93

94
            if job_state.exception is not None:
8✔
95
                # Oops, we have captured an error (which could also be 304 or a Playwright timeout)
96
                if job_state.error_ignored:
8!
97
                    # We captured an error to ignore
98
                    logger.info(
×
99
                        f'Job {job_state.job.index_number}: Error while executing job was ignored (e.g. due to job '
100
                        f'config or browser timing out)'
101
                    )
102
                elif isinstance(job_state.exception, NotModifiedError):
8!
103
                    # We captured a 304 Not Modified
104
                    logger.info(
×
105
                        f'Job {job_state.job.index_number}: Job has not changed (HTTP 304 response or same strong ETag)'
106
                    )
107
                    if job_state.tries > 0:
×
108
                        job_state.tries = 0
×
109
                        job_state.save()
×
110
                    if job_state.old_error_data and job_state.job.suppress_repeated_errors:
×
111
                        urlwatcher.report.unchanged_from_error(job_state)
×
112
                    else:
113
                        urlwatcher.report.unchanged(job_state)
×
114
                elif job_state.tries < max_tries:
8✔
115
                    # We're not reporting the error yet because we haven't yet hit 'max_tries'
116
                    logger.debug(
8✔
117
                        f'Job {job_state.job.index_number}: Error suppressed as cumulative number of '
118
                        f'failures ({job_state.tries}) does not exceed max_tries={max_tries}'
119
                    )
120
                    job_state.save()
8✔
121
                else:
122
                    # Reporting the error
123
                    logger.debug(
8✔
124
                        f'Job {job_state.job.index_number}: Flagged as error as max_tries={max_tries} has been '
125
                        f'met or exceeded ({job_state.tries}'
126
                    )
127
                    job_state.save()
8✔
128
                    if job_state.new_error_data == job_state.old_error_data:
8✔
129
                        urlwatcher.report.error_same_error(job_state)
8✔
130
                    else:
131
                        urlwatcher.report.error(job_state)
8✔
132
            elif job_state.old_data or job_state.old_timestamp != 0:
8✔
133
                # This is not the first time running this job (we have snapshots)
134
                if (
8✔
135
                    job_state.new_data == job_state.old_data
136
                    or job_state.new_data in job_state.history_dic_snapshots.keys()
137
                ):
138
                    # Exactly matches one of the previous snapshots
139
                    if job_state.tries > 0:
8✔
140
                        job_state.tries = 0
8✔
141
                        job_state.save()
8✔
142
                    if job_state.old_error_data and job_state.job.suppress_repeated_errors:
8!
143
                        urlwatcher.report.unchanged_from_error(job_state)
×
144
                    else:
145
                        urlwatcher.report.unchanged(job_state)
8✔
146
                else:
147
                    # # No exact match to previous snapshot  [fuzzy matching, untested and no longer makes sense]
148
                    # if len(job_state.history_dic_snapshots) > 1:
149
                    #     # Find the closest fuzzy matching saved snapshot ("good enough") and use it to diff against it
150
                    #     close_matches: list[str] = difflib.get_close_matches(
151
                    #         str(job_state.new_data), (str(k) for k in job_state.history_dic_snapshots.keys()), n=1
152
                    #     )
153
                    #     if close_matches:
154
                    #         logger.warning(
155
                    #             f'Job {job_state.job.index_number}: Did not find an existing run in the database,
156
                    #             f'but fuzzy matched it based on the contents of the data'
157
                    #         )
158
                    #         job_state.old_data = close_matches[0]
159
                    #         job_state.old_timestamp = job_state.history_dic_snapshots[close_matches[0]].timestamp
160
                    #         job_state.old_etag = job_state.history_dic_snapshots[close_matches[0]].etag
161
                    #         job_state.old_mime_type = job_state.history_dic_snapshots[close_matches[0]].mime_type
162

163
                    # It has different data, so we save it
164
                    job_state.tries = 0
8✔
165
                    job_state.save()
8✔
166
                    urlwatcher.report.changed(job_state)
8✔
167
            else:
168
                # We have never run this job before (there are no snapshots)
169
                job_state.tries = 0
8✔
170
                job_state.save()
8✔
171
                urlwatcher.report.new(job_state)
8✔
172

173
    jobs = set(UrlwatchCommand(urlwatcher).jobs_from_joblist())
8✔
174

175
    jobs = insert_delay(jobs)
8✔
176

177
    with ExitStack() as stack:  # This code is also present in command.list_error_jobs (change there too!)
8✔
178
        # run non-BrowserJob jobs first
179
        jobs_to_run = [job for job in jobs if not job.__is_browser__]
8✔
180
        if jobs_to_run:
8✔
181
            logger.debug(
8✔
182
                "Running jobs that do not require Chrome (without 'use_browser: true') in parallel with Python's "
183
                'default max_workers.'
184
            )
185
            job_runner(stack, jobs_to_run, urlwatcher.urlwatch_config.max_workers)
8✔
186
        else:
187
            logger.debug("Found no jobs that do not require Chrome (i.e. without 'use_browser: true').")
2✔
188

189
        # run BrowserJob jobs after
190
        jobs_to_run = [job for job in jobs if job.__is_browser__]
8✔
191
        if jobs_to_run:
8✔
192
            gc.collect()
2✔
193
            virt_mem = get_virt_mem()
2✔
194
            if urlwatcher.urlwatch_config.max_workers:
2!
195
                max_workers = urlwatcher.urlwatch_config.max_workers
×
196
            else:
197
                max_workers = max(int(virt_mem / 400e6), 1)
2✔
198
                max_workers = min(max_workers, os.cpu_count() or 1)
2✔
199
            logger.debug(
2✔
200
                f"Running jobs that require Chrome (i.e. with 'use_browser: true') in parallel with {max_workers} "
201
                f'max_workers.'
202
            )
203
            job_runner(stack, jobs_to_run, max_workers)
2✔
204
        else:
205
            logger.debug("Found no jobs that require Chrome (i.e. with 'use_browser: true').")
8✔
206

207

208
def get_virt_mem() -> int:
8✔
209
    """Return the amount of virtual memory available, i.e. the memory that can be given instantly to processes
210
    without the system going into swap. Expressed in bytes."""
211
    if isinstance(psutil, str):
2!
212
        raise ImportError(
×
213
            "Error when loading package 'psutil'; cannot use 'use_browser: true'. Please install "
214
            f"dependencies with 'pip install webchanges[use_browser]'.\n{psutil}"
215
        ) from None
216
    try:
2✔
217
        virt_mem: int = psutil.virtual_memory().available
2✔
218
        logger.debug(
2✔
219
            f'Found {virt_mem / 1e6:,.0f} MB of available physical memory (plus '
220
            f'{psutil.swap_memory().free / 1e6:,.0f} MB of swap).'
221
        )
222
    except psutil.Error as e:  # pragma: no cover
223
        virt_mem = 0
224
        logger.debug(f'Could not read memory information: {e}')
225

226
    return virt_mem
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc